Add data_analysis/src/data_analysis/cli.py
This commit is contained in:
parent
da5c79a470
commit
3348b9050d
1 changed files with 76 additions and 0 deletions
76
data_analysis/src/data_analysis/cli.py
Normal file
76
data_analysis/src/data_analysis/cli.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Any, List, Dict
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from data_analysis.core import analyze_runs
|
||||
|
||||
|
||||
def _parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Analyisiert Run-Daten und erkennt Δt<0-Fälle."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--input",
|
||||
required=True,
|
||||
help="Pfad zur Eingabedatei mit Run-Daten im JSON-Format."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
required=False,
|
||||
default="output/analysis_results.json",
|
||||
help="Pfad zur Ausgabedatei der Analyseergebnisse (JSON)."
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def _load_run_data(input_path: Path) -> List[Dict[str, Any]]:
|
||||
if not input_path.exists():
|
||||
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}")
|
||||
with input_path.open("r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError("Eingabedatei muss eine Liste von Run-Datensätzen enthalten.")
|
||||
# Grundvalidierung der benötigten Felder
|
||||
required_fields = {"run_id", "timestamp", "delta_t", "expiring_at"}
|
||||
for i, entry in enumerate(data):
|
||||
missing = required_fields - entry.keys()
|
||||
if missing:
|
||||
raise ValueError(f"Fehlende Felder {missing} in Eintrag {i}.")
|
||||
# Konvertierung von Datumsstrings zu datetime
|
||||
for entry in data:
|
||||
try:
|
||||
entry["timestamp"] = pd.to_datetime(entry["timestamp"], errors="raise").to_pydatetime()
|
||||
entry["expiring_at"] = pd.to_datetime(entry["expiring_at"], errors="raise").to_pydatetime()
|
||||
entry["delta_t"] = float(entry["delta_t"])
|
||||
except Exception as e:
|
||||
raise ValueError(f"Ungültige Daten im Eintrag {entry.get('run_id')}: {e}")
|
||||
return data
|
||||
|
||||
|
||||
def _write_results(output_path: Path, results: Dict[str, Any]) -> None:
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with output_path.open("w", encoding="utf-8") as f:
|
||||
json.dump(results, f, ensure_ascii=False, indent=2, default=str)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = _parse_args()
|
||||
input_path = Path(args.input)
|
||||
output_path = Path(args.output)
|
||||
|
||||
run_data = _load_run_data(input_path)
|
||||
results = analyze_runs(run_data)
|
||||
|
||||
if not isinstance(results, dict):
|
||||
raise TypeError("analyze_runs muss ein Dict zurückgeben.")
|
||||
|
||||
_write_results(output_path, results)
|
||||
print(f"Analyse abgeschlossen. Ergebnisse gespeichert unter: {output_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in a new issue