diff --git a/data_logging/src/data_logging/main.py b/data_logging/src/data_logging/main.py new file mode 100644 index 0000000..56a85e0 --- /dev/null +++ b/data_logging/src/data_logging/main.py @@ -0,0 +1,96 @@ +import argparse +import json +from datetime import datetime +from pathlib import Path +from typing import Any, Dict +import os + + +class LogEntry: + """Repräsentiert einen einzelnen Telemetrie-Logeintrag.""" + + def __init__(self, timestamp: datetime, temperature: float, wind_speed: float, error_rate: float) -> None: + if not isinstance(timestamp, datetime): + raise TypeError("timestamp muss datetime sein") + if not all(isinstance(v, (float, int)) for v in [temperature, wind_speed, error_rate]): + raise TypeError("temperature, wind_speed und error_rate müssen numerisch sein") + self.timestamp = timestamp + self.temperature = float(temperature) + self.wind_speed = float(wind_speed) + self.error_rate = float(error_rate) + + def to_json(self) -> Dict[str, Any]: + """Gibt die Struktur als JSON-kompatibles Dict zurück.""" + return { + "timestamp": self.timestamp.isoformat(), + "temperature": self.temperature, + "wind_speed": self.wind_speed, + "error_rate": self.error_rate, + } + + +def log_data(data: Dict[str, Any]) -> bool: + """Protokolliert übergebene Sensordatenobjekte in eine Logdatei.""" + if not isinstance(data, dict): + raise TypeError("data muss ein dict sein") + required = {"timestamp", "temperature", "wind_speed", "error_rate"} + if not required.issubset(data): + raise ValueError(f"Fehlende Felder: {required - set(data.keys())}") + + entry = LogEntry( + timestamp=datetime.fromisoformat(data["timestamp"]), + temperature=float(data["temperature"]), + wind_speed=float(data["wind_speed"]), + error_rate=float(data["error_rate"]), + ) + + output_path = data.get("_output_path") + if not output_path: + raise ValueError("_output_path muss angegeben werden") + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + + try: + if output.exists(): + with output.open("r", encoding="utf-8") as f: + logs = json.load(f) + if not isinstance(logs, list): + logs = [] + else: + logs = [] + logs.append(entry.to_json()) + with output.open("w", encoding="utf-8") as f: + json.dump(logs, f, indent=2, ensure_ascii=False) + return True + except (OSError, json.JSONDecodeError) as e: + print(f"Fehler beim Schreiben der Logdatei: {e}") + return False + + +def _main() -> None: + parser = argparse.ArgumentParser(description="Robot Night Telemetry Logger") + parser.add_argument("--input", required=True, help="Pfad zur Eingabe-JSON-Datei mit Sensordaten.") + parser.add_argument("--output", required=True, help="Zielpfad für die Protokollausgabe.") + args = parser.parse_args() + + input_path = Path(args.input) + if not input_path.exists(): + raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}") + + with input_path.open("r", encoding="utf-8") as f: + try: + telemetry_data = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Ungültige Eingabedatei: {e}") + + if isinstance(telemetry_data, dict): + telemetry_data = [telemetry_data] + + for record in telemetry_data: + if isinstance(record, dict): + record["_output_path"] = args.output + log_data(record) + + +if __name__ == "__main__": + _main() \ No newline at end of file