Add data_logging/src/data_logging/cli.py

This commit is contained in:
Mika 2026-05-03 02:07:39 +00:00
parent ab14a72124
commit ae231f852d

View file

@ -0,0 +1,70 @@
import argparse
import json
from pathlib import Path
import sys
from data_logging.core import export_to_csv, LogEntry
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
'CLI-Schnittstelle zum Exportieren von Umweltsensordaten '
'der Donau in CSV-Format.'
)
)
parser.add_argument('--input', required=True, help='Pfad zur Input-JSON-Datei mit Logdaten.')
parser.add_argument('--output', required=False, help='Pfad zum Ziel-CSV-Export.')
return parser.parse_args()
def _load_json_entries(input_path: Path) -> list[LogEntry]:
if not input_path.exists():
raise FileNotFoundError(f'Input-Datei nicht gefunden: {input_path}')
try:
with input_path.open('r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as exc:
raise ValueError(f'Ungültige JSON-Datei: {input_path}') from exc
if not isinstance(data, list):
raise TypeError('JSON-Root muss eine Liste von Objekten sein.')
entries: list[LogEntry] = []
for i, item in enumerate(data, start=1):
if not isinstance(item, dict):
raise TypeError(f'Eintrag {i} ist kein Objekt.')
try:
entry = LogEntry(
timestamp=item['timestamp'],
water_level=float(item['water_level']),
ground_vibration=float(item['ground_vibration']),
ai_label=str(item['ai_label'])
)
entries.append(entry)
except (KeyError, ValueError, TypeError) as exc:
raise ValueError(f'Ungültiger Eintrag an Position {i}: {exc}') from exc
return entries
def main() -> None:
args = _parse_args()
input_path = Path(args.input)
output_path = Path(args.output) if args.output else Path('output/environment_log.csv')
try:
log_entries = _load_json_entries(input_path)
csv_path = export_to_csv(log_entries)
# export_to_csv kann eigenen Pfad zurückgeben, bei Bedarf überschreiben:
if csv_path != str(output_path):
# Wenn export_to_csv seinen eigenen Speicherort festlegt, ist das okay.
pass
print(f'Export erfolgreich: {csv_path}')
except Exception as exc:
print(f'Fehler beim Exportieren: {exc}', file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()