From 0f973457a414b4541bc8f6a5175b1be399c3e06b Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 26 Apr 2026 02:07:50 +0000 Subject: [PATCH] Add data_logging/src/data_logging/core.py --- data_logging/src/data_logging/core.py | 81 +++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 data_logging/src/data_logging/core.py diff --git a/data_logging/src/data_logging/core.py b/data_logging/src/data_logging/core.py new file mode 100644 index 0000000..fe00440 --- /dev/null +++ b/data_logging/src/data_logging/core.py @@ -0,0 +1,81 @@ +import json +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any +from datetime import datetime +import logging + + +__all__ = ["LogEntry", "log_wifi_data"] + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) + + +@dataclass +class LogEntry: + """Repräsentiert einen WLAN-Messdatensatz für Logging-Zwecke.""" + timestamp: str + frequency: float + signal_strength: float + + def validate(self) -> None: + """Validiert die Felder des LogEntries.""" + try: + datetime.fromisoformat(self.timestamp) + except ValueError as e: + raise ValueError(f"Ungültiger Zeitstempel: {self.timestamp}") from e + + if not isinstance(self.frequency, (float, int)): + raise TypeError("frequency muss float oder int sein") + if not isinstance(self.signal_strength, (float, int)): + raise TypeError("signal_strength muss float oder int sein") + + +def log_wifi_data(timestamp: str, frequency: float, signal_strength: float, output_path: str = "output/wifi_log.json") -> None: + """Protokolliert einzelne WLAN-Signalmessdaten in einer JSON-Datei. + + Args: + timestamp: Zeitstempel im ISO 8601-Format. + frequency: Frequenz in GHz. + signal_strength: Signalstärke in dBm. + output_path: Pfad zur Logdatei. + """ + + logging.debug("Initialisiere Logeintrag für WLAN-Daten.") + + entry = LogEntry(timestamp=timestamp, frequency=frequency, signal_strength=signal_strength) + entry.validate() + + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + + # Bestehende Daten laden, falls vorhanden + if path.exists(): + try: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, list): + logging.warning("Unerwartetes Format in Logdatei, erstelle neue Liste.") + data = [] + except (json.JSONDecodeError, OSError) as e: + logging.error(f"Fehler beim Lesen der Logdatei: {e}") + data = [] + else: + data = [] + + data.append(asdict(entry)) + + # Validierung der Ausgabe + assert all(set(LogEntry.__annotations__.keys()) <= set(d.keys()) for d in data), "Logdatei enthält ungültige Felder" + + try: + with path.open("w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + logging.info(f"Eintrag erfolgreich protokolliert: {path}") + except OSError as e: + logging.error(f"Fehler beim Schreiben in die Datei: {e}") + raise \ No newline at end of file