commit 7c883811f28aa513d957652ef266aa5caaf746cd Author: Mika Date: Thu Mar 12 11:51:44 2026 +0000 Add outlier_analysis/src/outlier_analysis/core.py diff --git a/outlier_analysis/src/outlier_analysis/core.py b/outlier_analysis/src/outlier_analysis/core.py new file mode 100644 index 0000000..de8f8e4 --- /dev/null +++ b/outlier_analysis/src/outlier_analysis/core.py @@ -0,0 +1,106 @@ +from __future__ import annotations +import logging +from typing import List, Dict, Any +import pandas as pd +from dataclasses import dataclass, asdict +import statistics + + +logger = logging.getLogger(__name__) + + +@dataclass +class LogRecord: + run_id: str + latency_ms: float + stratum: str + job_parallelism: int + retry_total_overhead_ms: float + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'LogRecord': + required_fields = {f.name for f in cls.__dataclass_fields__.values()} + missing = required_fields - data.keys() + if missing: + raise ValueError(f"Missing required fields: {missing}") + try: + return cls( + run_id=str(data['run_id']), + latency_ms=float(data['latency_ms']), + stratum=str(data['stratum']), + job_parallelism=int(data['job_parallelism']), + retry_total_overhead_ms=float(data['retry_total_overhead_ms']) + ) + except (ValueError, TypeError) as exc: + raise ValueError(f"Invalid data in LogRecord: {data}") from exc + + +@dataclass +class OutlierSummary: + run_id: str + outlier_count: int + latency_distribution: Dict[str, float] + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class OutlierAnalysisError(Exception): + """Custom exception for outlier analysis errors.""" + pass + + +def analyze_outliers(log_data: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analysiert Logdaten auf Latenz-Outlier und erzeugt eine summarische Statistik pro Run. + + Parameter: + log_data: Liste von Logeinträgen als Dicts. + + Rückgabe: + Dict mit Schlüssel = run_id, Wert = OutlierSummary.to_dict() + """ + if not isinstance(log_data, list): + raise OutlierAnalysisError("Input log_data must be a list of dicts.") + + try: + records = [LogRecord.from_dict(item) for item in log_data] + except ValueError as e: + logger.error("Data validation failed: %s", e) + raise OutlierAnalysisError(str(e)) + + df = pd.DataFrame([asdict(r) for r in records]) + if df.empty: + return {} + + summaries: Dict[str, Any] = {} + + for run_id, group in df.groupby('run_id'): + latencies = group['latency_ms'] + p50 = float(latencies.quantile(0.5)) + p95 = float(latencies.quantile(0.95)) + p99 = float(latencies.quantile(0.99)) + max_latency = float(latencies.max()) + + # Define outliers as latency > p99 + outlier_mask = latencies > p99 + outlier_count = int(outlier_mask.sum()) + + latency_distribution = { + 'p50': round(p50, 3), + 'p95': round(p95, 3), + 'p99': round(p99, 3), + 'max': round(max_latency, 3) + } + + summary = OutlierSummary( + run_id=str(run_id), + outlier_count=outlier_count, + latency_distribution=latency_distribution + ) + summaries[str(run_id)] = summary.to_dict() + + logger.debug("Processed run_id=%s: %s", run_id, summary) + + assert all('run_id' in s for s in summaries.values()), 'Validation: Missing run_id in summary.' + return summaries \ No newline at end of file