Add outlier_analysis/src/outlier_analysis/cli.py

This commit is contained in:
Mika 2026-03-12 11:51:44 +00:00
parent 7c883811f2
commit c55c7dfa8f

View file

@ -0,0 +1,69 @@
import argparse
import json
import logging
from pathlib import Path
from typing import List, Dict, Any
import pandas as pd
from outlier_analysis.core import analyze_outliers
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
def _load_csv(file_path: Path) -> List[Dict[str, Any]]:
"""Liest eine CSV-Datei in eine Liste von Dicts ein und validiert Spalten."""
if not file_path.exists():
raise FileNotFoundError(f"Input file not found: {file_path}")
df = pd.read_csv(file_path)
required_cols = {"run_id", "latency_ms", "stratum", "job_parallelism", "retry_total_overhead_ms"}
missing = required_cols - set(df.columns)
if missing:
raise ValueError(f"Missing required columns in input CSV: {', '.join(missing)}")
records = df.to_dict(orient="records")
return records
def _save_json(data: Dict[str, Any], file_path: Path) -> None:
"""Speichert Dictionary als JSON-Datei mit Validierung und Logging."""
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open('w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logging.info("Wrote outlier summary to %s", file_path)
def main() -> None:
"""CLI-Einstiegspunkt zum Analysieren von Outliern in Logdaten."""
parser = argparse.ArgumentParser(description="Analyse von Latenz-Outliern in Logdaten.")
parser.add_argument('--input', required=True, help='Pfad zur CSV-Datei mit Logdaten.')
parser.add_argument('--output', required=True, help='Pfad zur Ausgabe der Outlier-Zusammenfassung im JSON-Format.')
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
try:
logging.info("Loading log data from %s", input_path)
log_data = _load_csv(input_path)
logging.info("Running outlier analysis...")
summary = analyze_outliers(log_data)
assert isinstance(summary, dict), "analyze_outliers() must return a dict"
_save_json(summary, output_path)
logging.info("Analysis complete.")
except Exception as e:
logging.error("Error during analysis: %s", e)
raise SystemExit(1)
if __name__ == '__main__':
main()