Add data_logging/src/data_logging/main.py

This commit is contained in:
Mika 2026-05-10 02:07:43 +00:00
parent 51149591eb
commit 4f7dc84912

View file

@ -0,0 +1,96 @@
import argparse
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
import os
class LogEntry:
"""Repräsentiert einen einzelnen Telemetrie-Logeintrag."""
def __init__(self, timestamp: datetime, temperature: float, wind_speed: float, error_rate: float) -> None:
if not isinstance(timestamp, datetime):
raise TypeError("timestamp muss datetime sein")
if not all(isinstance(v, (float, int)) for v in [temperature, wind_speed, error_rate]):
raise TypeError("temperature, wind_speed und error_rate müssen numerisch sein")
self.timestamp = timestamp
self.temperature = float(temperature)
self.wind_speed = float(wind_speed)
self.error_rate = float(error_rate)
def to_json(self) -> Dict[str, Any]:
"""Gibt die Struktur als JSON-kompatibles Dict zurück."""
return {
"timestamp": self.timestamp.isoformat(),
"temperature": self.temperature,
"wind_speed": self.wind_speed,
"error_rate": self.error_rate,
}
def log_data(data: Dict[str, Any]) -> bool:
"""Protokolliert übergebene Sensordatenobjekte in eine Logdatei."""
if not isinstance(data, dict):
raise TypeError("data muss ein dict sein")
required = {"timestamp", "temperature", "wind_speed", "error_rate"}
if not required.issubset(data):
raise ValueError(f"Fehlende Felder: {required - set(data.keys())}")
entry = LogEntry(
timestamp=datetime.fromisoformat(data["timestamp"]),
temperature=float(data["temperature"]),
wind_speed=float(data["wind_speed"]),
error_rate=float(data["error_rate"]),
)
output_path = data.get("_output_path")
if not output_path:
raise ValueError("_output_path muss angegeben werden")
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
try:
if output.exists():
with output.open("r", encoding="utf-8") as f:
logs = json.load(f)
if not isinstance(logs, list):
logs = []
else:
logs = []
logs.append(entry.to_json())
with output.open("w", encoding="utf-8") as f:
json.dump(logs, f, indent=2, ensure_ascii=False)
return True
except (OSError, json.JSONDecodeError) as e:
print(f"Fehler beim Schreiben der Logdatei: {e}")
return False
def _main() -> None:
parser = argparse.ArgumentParser(description="Robot Night Telemetry Logger")
parser.add_argument("--input", required=True, help="Pfad zur Eingabe-JSON-Datei mit Sensordaten.")
parser.add_argument("--output", required=True, help="Zielpfad für die Protokollausgabe.")
args = parser.parse_args()
input_path = Path(args.input)
if not input_path.exists():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}")
with input_path.open("r", encoding="utf-8") as f:
try:
telemetry_data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Ungültige Eingabedatei: {e}")
if isinstance(telemetry_data, dict):
telemetry_data = [telemetry_data]
for record in telemetry_data:
if isinstance(record, dict):
record["_output_path"] = args.output
log_data(record)
if __name__ == "__main__":
_main()