Add measurement_logging/src/measurement_logging/core.py
This commit is contained in:
parent
39069fbd95
commit
002c4a80a7
1 changed files with 81 additions and 0 deletions
81
measurement_logging/src/measurement_logging/core.py
Normal file
81
measurement_logging/src/measurement_logging/core.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
import logging
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Measurement:
|
||||
"""Dataclass zur strukturierten Repräsentation einer Messung."""
|
||||
time: str
|
||||
temperature: float
|
||||
humidity: float
|
||||
iso: int
|
||||
exposure: float
|
||||
rms_noise: float
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> 'Measurement':
|
||||
required_fields = {"time", "temperature", "humidity", "iso", "exposure", "rms_noise"}
|
||||
missing = required_fields - set(data.keys())
|
||||
if missing:
|
||||
raise ValueError(f"Fehlende Felder in Messdaten: {', '.join(sorted(missing))}")
|
||||
|
||||
try:
|
||||
return cls(
|
||||
time=str(data["time"]),
|
||||
temperature=float(data["temperature"]),
|
||||
humidity=float(data["humidity"]),
|
||||
iso=int(data["iso"]),
|
||||
exposure=float(data["exposure"]),
|
||||
rms_noise=float(data["rms_noise"])
|
||||
)
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Ungültige Datentypen in Messdaten: {e}") from e
|
||||
|
||||
|
||||
def log_measurement(time: str, temperature: float, humidity: float, iso: int, exposure: float, rms_noise: float, output_path: str | Path = "output/measurements.csv") -> None:
|
||||
"""Protokolliert eine Messung in einer CSV-Datei.
|
||||
|
||||
Schreibt bei Bedarf den Header, validiert Eingabewerte und fügt eine Datenzeile an.
|
||||
"""
|
||||
measurement = Measurement.from_dict({
|
||||
"time": time,
|
||||
"temperature": temperature,
|
||||
"humidity": humidity,
|
||||
"iso": iso,
|
||||
"exposure": exposure,
|
||||
"rms_noise": rms_noise
|
||||
})
|
||||
|
||||
output_file = Path(output_path)
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
header = ["time", "temperature", "humidity", "iso", "exposure", "rms_noise"]
|
||||
write_header = not output_file.exists() or output_file.stat().st_size == 0
|
||||
|
||||
try:
|
||||
with output_file.open(mode='a', newline='') as csvfile:
|
||||
writer = csv.writer(csvfile)
|
||||
if write_header:
|
||||
writer.writerow(header)
|
||||
logger.debug("CSV-Header geschrieben.")
|
||||
writer.writerow([
|
||||
measurement.time,
|
||||
f"{measurement.temperature:.2f}",
|
||||
f"{measurement.humidity:.2f}",
|
||||
measurement.iso,
|
||||
f"{measurement.exposure:.4f}",
|
||||
f"{measurement.rms_noise:.4f}"
|
||||
])
|
||||
logger.info(f"Messung erfolgreich protokolliert: {output_file}")
|
||||
except OSError as e:
|
||||
logger.error(f"Fehler beim Schreiben in CSV: {e}")
|
||||
raise
|
||||
Loading…
Reference in a new issue