Add run_summary/src/run_summary/core.py

This commit is contained in:
Mika 2026-01-25 17:42:35 +00:00
commit 69ce58136d

View file

@ -0,0 +1,104 @@
from __future__ import annotations
import json
from pathlib import Path
from statistics import quantiles
from typing import Any, Dict, List
import pandas as pd
class InputValidationError(Exception):
"""Custom exception raised when input validation fails."""
class SummaryData(dict):
"""Lightweight data structure for summary metrics."""
def __init__(self, mischfenster_p95: float, retry_free_in_window_rate: float):
super().__init__(
mischfenster_p95=mischfenster_p95,
retry_free_in_window_rate=retry_free_in_window_rate
)
def __repr__(self) -> str:
return f"SummaryData(p95={self['mischfenster_p95']:.4f}, retry_free_rate={self['retry_free_in_window_rate']:.4f})"
def _validate_raw_events(raw_events: List[Dict[str, Any]]) -> None:
if not isinstance(raw_events, list):
raise InputValidationError("raw_events must be a list of dictionaries.")
for i, evt in enumerate(raw_events):
if not isinstance(evt, dict):
raise InputValidationError(f"Event {i} is not a dictionary.")
if 'metric_value' not in evt or 'corr_id' not in evt:
raise InputValidationError(f"Missing required keys in event {i}.")
if not isinstance(evt['metric_value'], (int, float)):
raise InputValidationError(f"metric_value in event {i} must be numeric.")
if not isinstance(evt['corr_id'], str):
raise InputValidationError(f"corr_id in event {i} must be a string.")
def generate_summary(raw_events: List[Dict[str, Any]]) -> Dict[str, float]:
"""Analysiert Roh-Event-Daten und generiert eine strukturierte Run-Summary."""
_validate_raw_events(raw_events)
df = pd.DataFrame(raw_events)
if df.empty:
return SummaryData(0.0, 0.0)
metric_values = df['metric_value'].dropna().tolist()
if not metric_values:
mischfenster_p95 = 0.0
else:
try:
mischfenster_p95 = quantiles(metric_values, n=100)[94]
except Exception:
mischfenster_p95 = float(pd.Series(metric_values).quantile(0.95))
if 'retry' in df.columns:
total = len(df)
retry_free = (df['retry'] == False).sum()
retry_free_rate = retry_free / total if total > 0 else 0.0
else:
# Fallback: simulate retry detection if corr_id duplicates exist
total = len(df['corr_id'])
unique_corr = df['corr_id'].nunique()
retry_free_rate = unique_corr / total if total > 0 else 0.0
summary = SummaryData(mischfenster_p95, retry_free_rate)
assert 0.0 <= summary['retry_free_in_window_rate'] <= 1.0, "Invalid retry_free rate"
return summary
def make_gate_decision(summary: Dict[str, float]) -> bool:
"""Trifft eine Gate-Entscheidung basierend auf der Run-Summary."""
if not isinstance(summary, dict):
raise InputValidationError("summary must be a dictionary.")
if 'mischfenster_p95' not in summary or 'retry_free_in_window_rate' not in summary:
raise InputValidationError("summary missing required keys.")
mischfenster_p95 = summary.get('mischfenster_p95', 0.0)
retry_rate = summary.get('retry_free_in_window_rate', 0.0)
# Regel v0: gute Runs haben p95 < 1.2 und retry_free_rate > 0.9
decision = mischfenster_p95 < 1.2 and retry_rate > 0.9
return bool(decision)
def export_debug_artifact(raw_events: List[Dict[str, Any]], output_path: str) -> str:
"""Erzeugt ein JSON-Debug-File mit den Top-N auffälligsten Mischfenstern."""
_validate_raw_events(raw_events)
df = pd.DataFrame(raw_events)
if df.empty:
top_df = pd.DataFrame()
else:
top_df = df.sort_values('metric_value', ascending=False).head(10)
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
top_records = top_df.to_dict(orient='records')
with output_file.open('w', encoding='utf-8') as f:
json.dump(top_records, f, indent=2, ensure_ascii=False)
assert output_file.exists(), "Debug file not created"
return str(output_file)