diff --git a/data_analysis/src/data_analysis/cli.py b/data_analysis/src/data_analysis/cli.py new file mode 100644 index 0000000..65a576c --- /dev/null +++ b/data_analysis/src/data_analysis/cli.py @@ -0,0 +1,95 @@ +import argparse +import json +import sys +from pathlib import Path +import logging +from typing import List + +import pandas as pd # third party dependency + +from data_analysis.core import analyze_data, AnalysisResult +from data_analysis.io_utils import LogEntry + +logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Analyse der Donau-Rover Sensordaten für Muster und Anomalien." + ) + parser.add_argument( + "--input", + required=True, + help="Pfad zur JSON-Datei mit Rover-Logdaten." + ) + parser.add_argument( + "--output", + required=False, + help="Pfad zur Zielausgabedatei für die Analyseergebnisse." + ) + return parser.parse_args() + + +def load_log_entries(input_path: Path) -> List[LogEntry]: + if not input_path.exists(): + raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}") + + try: + df = pd.read_json(input_path) + except ValueError as e: + raise ValueError(f"Fehler beim Einlesen von JSON: {e}") + + required_columns = {"t", "Lx", "dB", "Temp", "Inference"} + if not required_columns.issubset(df.columns): + missing = required_columns - set(df.columns) + raise ValueError(f"Fehlende Felder in Eingabedatei: {missing}") + + entries: List[LogEntry] = [] + for _, row in df.iterrows(): + try: + entry = LogEntry( + timestamp=row["t"], + luminosity=int(row["Lx"]), + sound_level=float(row["dB"]), + temperature=float(row["Temp"]), + inference=float(row["Inference"]) + ) + entries.append(entry) + except (TypeError, ValueError) as e: + logger.warning(f"Überspringe ungültigen Datensatz: {e}") + if not entries: + raise ValueError("Keine gültigen LogEinträge geladen.") + return entries + + +def save_analysis_result(result: AnalysisResult, output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + result_dict = { + "significant_patterns": result.significant_patterns, + "anomaly_events": result.anomaly_events, + } + with output_path.open("w", encoding="utf-8") as f: + json.dump(result_dict, f, indent=2, ensure_ascii=False) + logger.info(f"Analyseergebnisse gespeichert unter: {output_path}") + + +def main() -> None: + args = parse_args() + input_path = Path(args.input) + output_path = Path(args.output) if args.output else Path("output/analysis_result.json") + + try: + log_entries = load_log_entries(input_path) + result = analyze_data(log_entries) + + assert isinstance(result, AnalysisResult), "Analyseergebnis ist ungültig." + save_analysis_result(result, output_path) + + except Exception as e: + logger.error(f"Fehler während der Analyse: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file