From 40da0f348cb8743532634092da0e30f2e16f0194 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 15 Feb 2026 03:06:26 +0000 Subject: [PATCH] Add data_analysis/src/data_analysis/cli.py --- data_analysis/src/data_analysis/cli.py | 54 ++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 data_analysis/src/data_analysis/cli.py diff --git a/data_analysis/src/data_analysis/cli.py b/data_analysis/src/data_analysis/cli.py new file mode 100644 index 0000000..647c5d9 --- /dev/null +++ b/data_analysis/src/data_analysis/cli.py @@ -0,0 +1,54 @@ +import argparse +import json +from pathlib import Path +import pandas as pd +from typing import List + +from data_analysis.core import analyze_data + + +def main() -> None: + """Command-line interface for the Donau Strommessung data analysis.""" + parser = argparse.ArgumentParser( + description="Analysiert Sensordaten und erstellt einen zusammenfassenden Bericht.", + ) + parser.add_argument( + "--input", + required=True, + help="Pfad zur Eingabe-CSV-Datei mit Messdaten.", + ) + parser.add_argument( + "--output", + required=True, + help="Pfad zur Ausgabe-JSON-Datei für Zusammenfassungen.", + ) + + args = parser.parse_args() + input_path = Path(args.input) + output_path = Path(args.output) + + assert input_path.exists(), f"Eingabedatei nicht gefunden: {input_path}" + assert input_path.suffix == ".csv", "Eingabedatei muss eine CSV-Datei sein." + + # Load sensor data from CSV into SensorData list + df = pd.read_csv(input_path) + required_columns = {"timestamp", "voltage_mv", "temperature_c", "humidity_percent"} + missing = required_columns - set(df.columns) + assert not missing, f"Fehlende Spalten in der Eingabedatei: {missing}" + + sensor_data: List[dict] = df.to_dict(orient="records") + + # Analyze data using the core function + summary_report = analyze_data(sensor_data) + + # Validate output type + assert isinstance(summary_report, dict), "analyze_data muss ein Dictionary (SummaryReport) zurückgeben." + + # Write JSON output + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + json.dump(summary_report, f, ensure_ascii=False, indent=2) + + +if __name__ == "__main__": + main()