Add data_analysis/src/data_analysis/cli.py

This commit is contained in:
Mika 2026-03-08 03:07:07 +00:00
parent 90e5b03840
commit 5466a06f4e

View file

@ -0,0 +1,95 @@
import argparse
import json
import sys
from pathlib import Path
import logging
from typing import List
import pandas as pd # third party dependency
from data_analysis.core import analyze_data, AnalysisResult
from data_analysis.io_utils import LogEntry
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Analyse der Donau-Rover Sensordaten für Muster und Anomalien."
)
parser.add_argument(
"--input",
required=True,
help="Pfad zur JSON-Datei mit Rover-Logdaten."
)
parser.add_argument(
"--output",
required=False,
help="Pfad zur Zielausgabedatei für die Analyseergebnisse."
)
return parser.parse_args()
def load_log_entries(input_path: Path) -> List[LogEntry]:
if not input_path.exists():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}")
try:
df = pd.read_json(input_path)
except ValueError as e:
raise ValueError(f"Fehler beim Einlesen von JSON: {e}")
required_columns = {"t", "Lx", "dB", "Temp", "Inference"}
if not required_columns.issubset(df.columns):
missing = required_columns - set(df.columns)
raise ValueError(f"Fehlende Felder in Eingabedatei: {missing}")
entries: List[LogEntry] = []
for _, row in df.iterrows():
try:
entry = LogEntry(
timestamp=row["t"],
luminosity=int(row["Lx"]),
sound_level=float(row["dB"]),
temperature=float(row["Temp"]),
inference=float(row["Inference"])
)
entries.append(entry)
except (TypeError, ValueError) as e:
logger.warning(f"Überspringe ungültigen Datensatz: {e}")
if not entries:
raise ValueError("Keine gültigen LogEinträge geladen.")
return entries
def save_analysis_result(result: AnalysisResult, output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
result_dict = {
"significant_patterns": result.significant_patterns,
"anomaly_events": result.anomaly_events,
}
with output_path.open("w", encoding="utf-8") as f:
json.dump(result_dict, f, indent=2, ensure_ascii=False)
logger.info(f"Analyseergebnisse gespeichert unter: {output_path}")
def main() -> None:
args = parse_args()
input_path = Path(args.input)
output_path = Path(args.output) if args.output else Path("output/analysis_result.json")
try:
log_entries = load_log_entries(input_path)
result = analyze_data(log_entries)
assert isinstance(result, AnalysisResult), "Analyseergebnis ist ungültig."
save_analysis_result(result, output_path)
except Exception as e:
logger.error(f"Fehler während der Analyse: {e}")
sys.exit(1)
if __name__ == "__main__":
main()