Add data_analysis/src/data_analysis/cli.py

This commit is contained in:
Mika 2026-02-15 03:06:26 +00:00
parent eb97474cdb
commit 40da0f348c

View file

@ -0,0 +1,54 @@
import argparse
import json
from pathlib import Path
import pandas as pd
from typing import List
from data_analysis.core import analyze_data
def main() -> None:
"""Command-line interface for the Donau Strommessung data analysis."""
parser = argparse.ArgumentParser(
description="Analysiert Sensordaten und erstellt einen zusammenfassenden Bericht.",
)
parser.add_argument(
"--input",
required=True,
help="Pfad zur Eingabe-CSV-Datei mit Messdaten.",
)
parser.add_argument(
"--output",
required=True,
help="Pfad zur Ausgabe-JSON-Datei für Zusammenfassungen.",
)
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
assert input_path.exists(), f"Eingabedatei nicht gefunden: {input_path}"
assert input_path.suffix == ".csv", "Eingabedatei muss eine CSV-Datei sein."
# Load sensor data from CSV into SensorData list
df = pd.read_csv(input_path)
required_columns = {"timestamp", "voltage_mv", "temperature_c", "humidity_percent"}
missing = required_columns - set(df.columns)
assert not missing, f"Fehlende Spalten in der Eingabedatei: {missing}"
sensor_data: List[dict] = df.to_dict(orient="records")
# Analyze data using the core function
summary_report = analyze_data(sensor_data)
# Validate output type
assert isinstance(summary_report, dict), "analyze_data muss ein Dictionary (SummaryReport) zurückgeben."
# Write JSON output
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(summary_report, f, ensure_ascii=False, indent=2)
if __name__ == "__main__":
main()