diff --git a/measurement_logging/src/measurement_logging/core.py b/measurement_logging/src/measurement_logging/core.py new file mode 100644 index 0000000..83ee210 --- /dev/null +++ b/measurement_logging/src/measurement_logging/core.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Any +import logging + + +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + + +@dataclass +class Measurement: + """Dataclass zur strukturierten Repräsentation einer Messung.""" + time: str + temperature: float + humidity: float + iso: int + exposure: float + rms_noise: float + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'Measurement': + required_fields = {"time", "temperature", "humidity", "iso", "exposure", "rms_noise"} + missing = required_fields - set(data.keys()) + if missing: + raise ValueError(f"Fehlende Felder in Messdaten: {', '.join(sorted(missing))}") + + try: + return cls( + time=str(data["time"]), + temperature=float(data["temperature"]), + humidity=float(data["humidity"]), + iso=int(data["iso"]), + exposure=float(data["exposure"]), + rms_noise=float(data["rms_noise"]) + ) + except (TypeError, ValueError) as e: + raise ValueError(f"Ungültige Datentypen in Messdaten: {e}") from e + + +def log_measurement(time: str, temperature: float, humidity: float, iso: int, exposure: float, rms_noise: float, output_path: str | Path = "output/measurements.csv") -> None: + """Protokolliert eine Messung in einer CSV-Datei. + + Schreibt bei Bedarf den Header, validiert Eingabewerte und fügt eine Datenzeile an. + """ + measurement = Measurement.from_dict({ + "time": time, + "temperature": temperature, + "humidity": humidity, + "iso": iso, + "exposure": exposure, + "rms_noise": rms_noise + }) + + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + header = ["time", "temperature", "humidity", "iso", "exposure", "rms_noise"] + write_header = not output_file.exists() or output_file.stat().st_size == 0 + + try: + with output_file.open(mode='a', newline='') as csvfile: + writer = csv.writer(csvfile) + if write_header: + writer.writerow(header) + logger.debug("CSV-Header geschrieben.") + writer.writerow([ + measurement.time, + f"{measurement.temperature:.2f}", + f"{measurement.humidity:.2f}", + measurement.iso, + f"{measurement.exposure:.4f}", + f"{measurement.rms_noise:.4f}" + ]) + logger.info(f"Messung erfolgreich protokolliert: {output_file}") + except OSError as e: + logger.error(f"Fehler beim Schreiben in CSV: {e}") + raise \ No newline at end of file