From ae231f852d74bed523341976708bb525565e1c91 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 3 May 2026 02:07:39 +0000 Subject: [PATCH] Add data_logging/src/data_logging/cli.py --- data_logging/src/data_logging/cli.py | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 data_logging/src/data_logging/cli.py diff --git a/data_logging/src/data_logging/cli.py b/data_logging/src/data_logging/cli.py new file mode 100644 index 0000000..9e246ab --- /dev/null +++ b/data_logging/src/data_logging/cli.py @@ -0,0 +1,70 @@ +import argparse +import json +from pathlib import Path +import sys + +from data_logging.core import export_to_csv, LogEntry + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + 'CLI-Schnittstelle zum Exportieren von Umweltsensordaten ' + 'der Donau in CSV-Format.' + ) + ) + parser.add_argument('--input', required=True, help='Pfad zur Input-JSON-Datei mit Logdaten.') + parser.add_argument('--output', required=False, help='Pfad zum Ziel-CSV-Export.') + return parser.parse_args() + + +def _load_json_entries(input_path: Path) -> list[LogEntry]: + if not input_path.exists(): + raise FileNotFoundError(f'Input-Datei nicht gefunden: {input_path}') + try: + with input_path.open('r', encoding='utf-8') as f: + data = json.load(f) + except json.JSONDecodeError as exc: + raise ValueError(f'Ungültige JSON-Datei: {input_path}') from exc + + if not isinstance(data, list): + raise TypeError('JSON-Root muss eine Liste von Objekten sein.') + + entries: list[LogEntry] = [] + for i, item in enumerate(data, start=1): + if not isinstance(item, dict): + raise TypeError(f'Eintrag {i} ist kein Objekt.') + try: + entry = LogEntry( + timestamp=item['timestamp'], + water_level=float(item['water_level']), + ground_vibration=float(item['ground_vibration']), + ai_label=str(item['ai_label']) + ) + entries.append(entry) + except (KeyError, ValueError, TypeError) as exc: + raise ValueError(f'Ungültiger Eintrag an Position {i}: {exc}') from exc + + return entries + + +def main() -> None: + args = _parse_args() + input_path = Path(args.input) + output_path = Path(args.output) if args.output else Path('output/environment_log.csv') + + try: + log_entries = _load_json_entries(input_path) + csv_path = export_to_csv(log_entries) + # export_to_csv kann eigenen Pfad zurückgeben, bei Bedarf überschreiben: + if csv_path != str(output_path): + # Wenn export_to_csv seinen eigenen Speicherort festlegt, ist das okay. + pass + print(f'Export erfolgreich: {csv_path}') + except Exception as exc: + print(f'Fehler beim Exportieren: {exc}', file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main()