Add max_outlier_analysis_script/src/max_outlier_analysis_script/cli.py

This commit is contained in:
Mika 2026-03-11 12:43:10 +00:00
parent bd3d545b1b
commit 8f351f9310

View file

@ -0,0 +1,93 @@
import argparse
import json
import logging
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
from max_outlier_analysis_script.core import analyze_max_outliers
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s:%(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
def _read_csv_input(path: Path) -> List[Dict[str, Any]]:
"""Liest CSV-Eingabedatei und validiert die Felder."""
required_columns = {
'corr_id', 'stratum', 'job_parallelism',
'expires_at_dist_hours', 'retry_total_overhead_ms', 'latency_max'
}
df = pd.read_csv(path)
missing = required_columns - set(df.columns)
if missing:
raise ValueError(f"CSV fehlt Spalten: {missing}")
data = df.to_dict(orient='records')
# Typvalidierung einfach aber strikt
for i, record in enumerate(data):
assert isinstance(record['corr_id'], str), f"Zeile {i}: corr_id muss str sein"
assert isinstance(record['stratum'], str), f"Zeile {i}: stratum muss str sein"
assert isinstance(record['job_parallelism'], (int, float)), f"Zeile {i}: job_parallelism muss numerisch sein"
assert isinstance(record['expires_at_dist_hours'], (int, float)), f"Zeile {i}: expires_at_dist_hours muss numerisch sein"
assert isinstance(record['retry_total_overhead_ms'], (int, float)), f"Zeile {i}: retry_total_overhead_ms muss numerisch sein"
assert isinstance(record['latency_max'], (int, float)), f"Zeile {i}: latency_max muss numerisch sein"
logger.debug("CSV-Daten erfolgreich eingelesen und validiert (%d Records)", len(data))
return data
def _write_json_output(output_path: Path, results: Dict[str, Any]) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as f:
json.dump(results, f, indent=2)
logger.info("Analyseergebnisse geschrieben nach %s", output_path)
def main() -> None:
parser = argparse.ArgumentParser(
description="Analyse von Max-Outlier-Daten aus CI-Lasttests"
)
parser.add_argument('--input', required=True, help='Pfad zur CSV-Eingabedatei')
parser.add_argument('--output', required=False, default='output/analysis_summary.json',
help='Pfad zur Ausgabedatei für Analyseergebnisse (JSON)')
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
if not input_path.exists():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}")
try:
logger.info("Lese CSV-Datei: %s", input_path)
data = _read_csv_input(input_path)
logger.info("Starte Analyse über %d Datensätze", len(data))
results = analyze_max_outliers(data)
if not isinstance(results, dict):
raise TypeError("analyse_max_outliers muss ein dict zurückgeben")
# Minimalvalidierung des Ergebnisobjekts (CI-Ready)
expected_keys = {'max_above_p99_count', 'near_expiry_cluster_percentage', 'retry_overhead_variance'}
assert expected_keys.issubset(results.keys()), (
f"Ergebnis enthält nicht alle erwarteten Felder: {expected_keys - set(results.keys())}"
)
_write_json_output(output_path, results)
logger.info("Analyse abgeschlossen.")
except Exception as e:
logger.exception("Fehler bei der Ausführung: %s", e)
raise
if __name__ == '__main__':
main()