From 8f351f93108ac31378cc645507e7d58eca85bda8 Mon Sep 17 00:00:00 2001 From: Mika Date: Wed, 11 Mar 2026 12:43:10 +0000 Subject: [PATCH] Add max_outlier_analysis_script/src/max_outlier_analysis_script/cli.py --- .../src/max_outlier_analysis_script/cli.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 max_outlier_analysis_script/src/max_outlier_analysis_script/cli.py diff --git a/max_outlier_analysis_script/src/max_outlier_analysis_script/cli.py b/max_outlier_analysis_script/src/max_outlier_analysis_script/cli.py new file mode 100644 index 0000000..4612d6a --- /dev/null +++ b/max_outlier_analysis_script/src/max_outlier_analysis_script/cli.py @@ -0,0 +1,93 @@ +import argparse +import json +import logging +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd + +from max_outlier_analysis_script.core import analyze_max_outliers + + +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s:%(name)s: %(message)s' +) +logger = logging.getLogger(__name__) + + +def _read_csv_input(path: Path) -> List[Dict[str, Any]]: + """Liest CSV-Eingabedatei und validiert die Felder.""" + required_columns = { + 'corr_id', 'stratum', 'job_parallelism', + 'expires_at_dist_hours', 'retry_total_overhead_ms', 'latency_max' + } + df = pd.read_csv(path) + + missing = required_columns - set(df.columns) + if missing: + raise ValueError(f"CSV fehlt Spalten: {missing}") + + data = df.to_dict(orient='records') + + # Typvalidierung einfach aber strikt + for i, record in enumerate(data): + assert isinstance(record['corr_id'], str), f"Zeile {i}: corr_id muss str sein" + assert isinstance(record['stratum'], str), f"Zeile {i}: stratum muss str sein" + assert isinstance(record['job_parallelism'], (int, float)), f"Zeile {i}: job_parallelism muss numerisch sein" + assert isinstance(record['expires_at_dist_hours'], (int, float)), f"Zeile {i}: expires_at_dist_hours muss numerisch sein" + assert isinstance(record['retry_total_overhead_ms'], (int, float)), f"Zeile {i}: retry_total_overhead_ms muss numerisch sein" + assert isinstance(record['latency_max'], (int, float)), f"Zeile {i}: latency_max muss numerisch sein" + + logger.debug("CSV-Daten erfolgreich eingelesen und validiert (%d Records)", len(data)) + return data + + +def _write_json_output(output_path: Path, results: Dict[str, Any]) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open('w', encoding='utf-8') as f: + json.dump(results, f, indent=2) + logger.info("Analyseergebnisse geschrieben nach %s", output_path) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Analyse von Max-Outlier-Daten aus CI-Lasttests" + ) + parser.add_argument('--input', required=True, help='Pfad zur CSV-Eingabedatei') + parser.add_argument('--output', required=False, default='output/analysis_summary.json', + help='Pfad zur Ausgabedatei für Analyseergebnisse (JSON)') + + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + if not input_path.exists(): + raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}") + + try: + logger.info("Lese CSV-Datei: %s", input_path) + data = _read_csv_input(input_path) + logger.info("Starte Analyse über %d Datensätze", len(data)) + results = analyze_max_outliers(data) + + if not isinstance(results, dict): + raise TypeError("analyse_max_outliers muss ein dict zurückgeben") + + # Minimalvalidierung des Ergebnisobjekts (CI-Ready) + expected_keys = {'max_above_p99_count', 'near_expiry_cluster_percentage', 'retry_overhead_variance'} + assert expected_keys.issubset(results.keys()), ( + f"Ergebnis enthält nicht alle erwarteten Felder: {expected_keys - set(results.keys())}" + ) + + _write_json_output(output_path, results) + logger.info("Analyse abgeschlossen.") + + except Exception as e: + logger.exception("Fehler bei der Ausführung: %s", e) + raise + + +if __name__ == '__main__': + main() \ No newline at end of file