Add data_logging/src/data_logging/core.py

This commit is contained in:
Mika 2026-04-26 02:07:50 +00:00
parent 1c7c76ed6f
commit 0f973457a4

View file

@ -0,0 +1,81 @@
import json
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any
from datetime import datetime
import logging
__all__ = ["LogEntry", "log_wifi_data"]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
@dataclass
class LogEntry:
"""Repräsentiert einen WLAN-Messdatensatz für Logging-Zwecke."""
timestamp: str
frequency: float
signal_strength: float
def validate(self) -> None:
"""Validiert die Felder des LogEntries."""
try:
datetime.fromisoformat(self.timestamp)
except ValueError as e:
raise ValueError(f"Ungültiger Zeitstempel: {self.timestamp}") from e
if not isinstance(self.frequency, (float, int)):
raise TypeError("frequency muss float oder int sein")
if not isinstance(self.signal_strength, (float, int)):
raise TypeError("signal_strength muss float oder int sein")
def log_wifi_data(timestamp: str, frequency: float, signal_strength: float, output_path: str = "output/wifi_log.json") -> None:
"""Protokolliert einzelne WLAN-Signalmessdaten in einer JSON-Datei.
Args:
timestamp: Zeitstempel im ISO 8601-Format.
frequency: Frequenz in GHz.
signal_strength: Signalstärke in dBm.
output_path: Pfad zur Logdatei.
"""
logging.debug("Initialisiere Logeintrag für WLAN-Daten.")
entry = LogEntry(timestamp=timestamp, frequency=frequency, signal_strength=signal_strength)
entry.validate()
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
# Bestehende Daten laden, falls vorhanden
if path.exists():
try:
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
logging.warning("Unerwartetes Format in Logdatei, erstelle neue Liste.")
data = []
except (json.JSONDecodeError, OSError) as e:
logging.error(f"Fehler beim Lesen der Logdatei: {e}")
data = []
else:
data = []
data.append(asdict(entry))
# Validierung der Ausgabe
assert all(set(LogEntry.__annotations__.keys()) <= set(d.keys()) for d in data), "Logdatei enthält ungültige Felder"
try:
with path.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logging.info(f"Eintrag erfolgreich protokolliert: {path}")
except OSError as e:
logging.error(f"Fehler beim Schreiben in die Datei: {e}")
raise