Add kiesel_temperature_logging/src/kiesel_temperature_logging/core.py

This commit is contained in:
Mika 2026-06-21 02:07:23 +00:00
parent 155cb7ca7e
commit fe129bcc7b

View file

@ -0,0 +1,102 @@
from __future__ import annotations
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime
import numpy as np
from typing import List, Dict, Any
@dataclass
class TemperatureLog:
timestamp: str
temperature: float
def validate(self) -> None:
try:
datetime.fromisoformat(self.timestamp)
except ValueError as e:
raise ValueError(f"Invalid timestamp format: {self.timestamp}") from e
if not isinstance(self.temperature, (float, int)):
raise TypeError(f"Temperature must be float or int, got {type(self.temperature)}")
@dataclass
class TemperatureAnalysisResult:
mean_temperature: float
pulse_interval: float
correlation: float
_LOG_FILE = Path('data/temperature_log.json')
def _ensure_logfile_exists(path: Path) -> None:
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('w') as f:
json.dump([], f)
def log_temperature_data(temperature: float, timestamp: str) -> None:
"""Schreibt einen neuen Temperaturmesspunkt in die JSON-Logdatei."""
entry = TemperatureLog(timestamp=timestamp, temperature=float(temperature))
entry.validate()
_ensure_logfile_exists(_LOG_FILE)
with _LOG_FILE.open('r+', encoding='utf-8') as f:
try:
data = json.load(f)
if not isinstance(data, list):
data = []
except json.JSONDecodeError:
data = []
data.append(asdict(entry))
f.seek(0)
json.dump(data, f, indent=2, ensure_ascii=False)
f.truncate()
def analyze_temperature_pulses(data: List[Dict[str, Any]]) -> Dict[str, float]:
"""Analysiert Temperaturdaten zur Erkennung periodischer Pulse im Werteverlauf."""
if not data:
raise ValueError("Data list is empty.")
# Sort by timestamp
try:
data_sorted = sorted(data, key=lambda x: datetime.fromisoformat(x['timestamp']))
except Exception as e:
raise ValueError("Invalid timestamp in data.") from e
temps = np.array([float(item['temperature']) for item in data_sorted])
times = np.array([
datetime.fromisoformat(item['timestamp']).timestamp() for item in data_sorted
])
if len(temps) < 2:
raise ValueError("Not enough data points for analysis.")
mean_temp = float(np.mean(temps))
# Frequency analysis via FFT for pulse detection
detrended = temps - mean_temp
fft_vals = np.fft.rfft(detrended)
fft_freq = np.fft.rfftfreq(len(times), d=np.median(np.diff(times)))
if len(fft_freq) < 2:
pulse_interval = 0.0
corr = 0.0
else:
power_spectrum = np.abs(fft_vals)
dominant_idx = np.argmax(power_spectrum[1:]) + 1
dominant_freq = fft_freq[dominant_idx]
pulse_interval = float(1.0 / dominant_freq) if dominant_freq != 0 else 0.0
corr = float(np.corrcoef(temps[:-1], temps[1:])[0, 1]) if len(temps) > 1 else 0.0
result = TemperatureAnalysisResult(
mean_temperature=mean_temp,
pulse_interval=pulse_interval,
correlation=corr
)
return asdict(result)