From a273c91c59b535bb203dc345fd0b388a19f03782 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 8 Feb 2026 03:06:29 +0000 Subject: [PATCH] Add data_logger/src/data_logger/core.py --- data_logger/src/data_logger/core.py | 59 +++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 data_logger/src/data_logger/core.py diff --git a/data_logger/src/data_logger/core.py b/data_logger/src/data_logger/core.py new file mode 100644 index 0000000..68f4728 --- /dev/null +++ b/data_logger/src/data_logger/core.py @@ -0,0 +1,59 @@ +from __future__ import annotations +import csv +from dataclasses import dataclass, asdict +from datetime import datetime +from pathlib import Path +from typing import List + + +@dataclass +class SensorData: + """Repräsentiert eine einzelne Sensormessung mit Zeitstempel.""" + + timestamp: datetime + temperature: float + vibration_amplitude: float + humidity: float + + def __post_init__(self) -> None: + # Input-Validierung zur Sicherstellung von Datentypen und sinnvoller Werte. + if not isinstance(self.timestamp, datetime): + raise TypeError("timestamp muss ein datetime-Objekt sein.") + for field_name in ("temperature", "vibration_amplitude", "humidity"): + value = getattr(self, field_name) + if not isinstance(value, (float, int)): + raise TypeError(f"{field_name} muss numerisch sein.") + assert -100.0 <= self.temperature <= 200.0, "Temperatur außerhalb des zulässigen Bereichs." + assert 0.0 <= self.humidity <= 100.0, "Luftfeuchtigkeit muss zwischen 0 und 100 liegen." + + +def log_sensor_data(sensor_values: List[SensorData]) -> str: + """Speichert die übergebenen Sensordaten in einer CSV-Datei. + + Args: + sensor_values: Liste von SensorData-Objekten, die geschrieben werden sollen. + + Returns: + Pfad zur erzeugten CSV-Datei als String. + """ + if not isinstance(sensor_values, list) or not all(isinstance(v, SensorData) for v in sensor_values): + raise TypeError("sensor_values muss eine Liste von SensorData-Objekten sein.") + + if not sensor_values: + raise ValueError("sensor_values darf nicht leer sein.") + + timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") + output_path = Path(f"output/logged_data_{timestamp_str}.csv") + output_path.parent.mkdir(parents=True, exist_ok=True) + + fieldnames = ["timestamp", "temperature", "vibration_amplitude", "humidity"] + + with output_path.open(mode="w", newline="", encoding="utf-8") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for entry in sensor_values: + record = asdict(entry) + record["timestamp"] = entry.timestamp.isoformat() + writer.writerow(record) + + return str(output_path) \ No newline at end of file