From c55c7dfa8f1dfb94dd725d9a39a86a833200fde5 Mon Sep 17 00:00:00 2001 From: Mika Date: Thu, 12 Mar 2026 11:51:44 +0000 Subject: [PATCH] Add outlier_analysis/src/outlier_analysis/cli.py --- outlier_analysis/src/outlier_analysis/cli.py | 69 ++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 outlier_analysis/src/outlier_analysis/cli.py diff --git a/outlier_analysis/src/outlier_analysis/cli.py b/outlier_analysis/src/outlier_analysis/cli.py new file mode 100644 index 0000000..1f16225 --- /dev/null +++ b/outlier_analysis/src/outlier_analysis/cli.py @@ -0,0 +1,69 @@ +import argparse +import json +import logging +from pathlib import Path +from typing import List, Dict, Any + +import pandas as pd + +from outlier_analysis.core import analyze_outliers + + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s' +) + + +def _load_csv(file_path: Path) -> List[Dict[str, Any]]: + """Liest eine CSV-Datei in eine Liste von Dicts ein und validiert Spalten.""" + if not file_path.exists(): + raise FileNotFoundError(f"Input file not found: {file_path}") + + df = pd.read_csv(file_path) + required_cols = {"run_id", "latency_ms", "stratum", "job_parallelism", "retry_total_overhead_ms"} + + missing = required_cols - set(df.columns) + if missing: + raise ValueError(f"Missing required columns in input CSV: {', '.join(missing)}") + + records = df.to_dict(orient="records") + return records + + +def _save_json(data: Dict[str, Any], file_path: Path) -> None: + """Speichert Dictionary als JSON-Datei mit Validierung und Logging.""" + file_path.parent.mkdir(parents=True, exist_ok=True) + with file_path.open('w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + logging.info("Wrote outlier summary to %s", file_path) + + +def main() -> None: + """CLI-Einstiegspunkt zum Analysieren von Outliern in Logdaten.""" + parser = argparse.ArgumentParser(description="Analyse von Latenz-Outliern in Logdaten.") + parser.add_argument('--input', required=True, help='Pfad zur CSV-Datei mit Logdaten.') + parser.add_argument('--output', required=True, help='Pfad zur Ausgabe der Outlier-Zusammenfassung im JSON-Format.') + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + try: + logging.info("Loading log data from %s", input_path) + log_data = _load_csv(input_path) + + logging.info("Running outlier analysis...") + summary = analyze_outliers(log_data) + + assert isinstance(summary, dict), "analyze_outliers() must return a dict" + _save_json(summary, output_path) + + logging.info("Analysis complete.") + except Exception as e: + logging.error("Error during analysis: %s", e) + raise SystemExit(1) + + +if __name__ == '__main__': + main()