Add temperature_logger/src/temperature_logger/core.py

This commit is contained in:
Mika 2026-05-31 02:06:38 +00:00
commit a7e7b95ea0

View file

@ -0,0 +1,67 @@
import json
import time
import datetime
import os
from pathlib import Path
from dataclasses import dataclass, asdict
import serial
@dataclass
class TemperatureLog:
timestamp: str
temperature: float
location: str
def _validate_sensor(sensor: str) -> None:
if not isinstance(sensor, str) or not sensor:
raise ValueError("Sensor-Port muss ein nicht-leerer String sein.")
def _validate_interval(interval: float) -> None:
if not isinstance(interval, (float, int)) or interval <= 0:
raise ValueError("Das Messintervall muss eine positive Zahl sein.")
def log_temperature_data(sensor: str, interval: float) -> None:
"""Liest periodisch Temperaturdaten vom angegebenen Sensorport und speichert sie als JSON."""
_validate_sensor(sensor)
_validate_interval(interval)
output_path = Path("output/temperature_log.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
ser = serial.Serial(sensor, baudrate=9600, timeout=2)
except serial.SerialException as e:
raise RuntimeError(f"Fehler beim Öffnen des Sensorports '{sensor}': {e}")
try:
while True:
line = ser.readline().decode('utf-8').strip()
if not line:
continue
try:
temperature = float(line)
except ValueError:
continue # Ungültige Daten überspringen
entry = TemperatureLog(
timestamp=datetime.datetime.utcnow().isoformat(),
temperature=temperature,
location=os.uname().nodename if hasattr(os, 'uname') else 'unknown'
)
with output_path.open('a', encoding='utf-8') as f:
json.dump(asdict(entry), f, ensure_ascii=False)
f.write('\n')
time.sleep(interval)
except KeyboardInterrupt:
print("\nTemperatur-Logging beendet.")
finally:
ser.close()
__all__ = ["TemperatureLog", "log_temperature_data"]