Add outlier_analysis/src/outlier_analysis/core.py

This commit is contained in:
Mika 2026-03-12 11:51:44 +00:00
commit 7c883811f2

View file

@ -0,0 +1,106 @@
from __future__ import annotations
import logging
from typing import List, Dict, Any
import pandas as pd
from dataclasses import dataclass, asdict
import statistics
logger = logging.getLogger(__name__)
@dataclass
class LogRecord:
run_id: str
latency_ms: float
stratum: str
job_parallelism: int
retry_total_overhead_ms: float
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'LogRecord':
required_fields = {f.name for f in cls.__dataclass_fields__.values()}
missing = required_fields - data.keys()
if missing:
raise ValueError(f"Missing required fields: {missing}")
try:
return cls(
run_id=str(data['run_id']),
latency_ms=float(data['latency_ms']),
stratum=str(data['stratum']),
job_parallelism=int(data['job_parallelism']),
retry_total_overhead_ms=float(data['retry_total_overhead_ms'])
)
except (ValueError, TypeError) as exc:
raise ValueError(f"Invalid data in LogRecord: {data}") from exc
@dataclass
class OutlierSummary:
run_id: str
outlier_count: int
latency_distribution: Dict[str, float]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class OutlierAnalysisError(Exception):
"""Custom exception for outlier analysis errors."""
pass
def analyze_outliers(log_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Analysiert Logdaten auf Latenz-Outlier und erzeugt eine summarische Statistik pro Run.
Parameter:
log_data: Liste von Logeinträgen als Dicts.
Rückgabe:
Dict mit Schlüssel = run_id, Wert = OutlierSummary.to_dict()
"""
if not isinstance(log_data, list):
raise OutlierAnalysisError("Input log_data must be a list of dicts.")
try:
records = [LogRecord.from_dict(item) for item in log_data]
except ValueError as e:
logger.error("Data validation failed: %s", e)
raise OutlierAnalysisError(str(e))
df = pd.DataFrame([asdict(r) for r in records])
if df.empty:
return {}
summaries: Dict[str, Any] = {}
for run_id, group in df.groupby('run_id'):
latencies = group['latency_ms']
p50 = float(latencies.quantile(0.5))
p95 = float(latencies.quantile(0.95))
p99 = float(latencies.quantile(0.99))
max_latency = float(latencies.max())
# Define outliers as latency > p99
outlier_mask = latencies > p99
outlier_count = int(outlier_mask.sum())
latency_distribution = {
'p50': round(p50, 3),
'p95': round(p95, 3),
'p99': round(p99, 3),
'max': round(max_latency, 3)
}
summary = OutlierSummary(
run_id=str(run_id),
outlier_count=outlier_count,
latency_distribution=latency_distribution
)
summaries[str(run_id)] = summary.to_dict()
logger.debug("Processed run_id=%s: %s", run_id, summary)
assert all('run_id' in s for s in summaries.values()), 'Validation: Missing run_id in summary.'
return summaries