From 69ce58136d6e905e33489b3ad6d1828c697781ab Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 25 Jan 2026 17:42:35 +0000 Subject: [PATCH] Add run_summary/src/run_summary/core.py --- run_summary/src/run_summary/core.py | 104 ++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 run_summary/src/run_summary/core.py diff --git a/run_summary/src/run_summary/core.py b/run_summary/src/run_summary/core.py new file mode 100644 index 0000000..27740bc --- /dev/null +++ b/run_summary/src/run_summary/core.py @@ -0,0 +1,104 @@ +from __future__ import annotations +import json +from pathlib import Path +from statistics import quantiles +from typing import Any, Dict, List +import pandas as pd + + +class InputValidationError(Exception): + """Custom exception raised when input validation fails.""" + + +class SummaryData(dict): + """Lightweight data structure for summary metrics.""" + def __init__(self, mischfenster_p95: float, retry_free_in_window_rate: float): + super().__init__( + mischfenster_p95=mischfenster_p95, + retry_free_in_window_rate=retry_free_in_window_rate + ) + + def __repr__(self) -> str: + return f"SummaryData(p95={self['mischfenster_p95']:.4f}, retry_free_rate={self['retry_free_in_window_rate']:.4f})" + + +def _validate_raw_events(raw_events: List[Dict[str, Any]]) -> None: + if not isinstance(raw_events, list): + raise InputValidationError("raw_events must be a list of dictionaries.") + for i, evt in enumerate(raw_events): + if not isinstance(evt, dict): + raise InputValidationError(f"Event {i} is not a dictionary.") + if 'metric_value' not in evt or 'corr_id' not in evt: + raise InputValidationError(f"Missing required keys in event {i}.") + if not isinstance(evt['metric_value'], (int, float)): + raise InputValidationError(f"metric_value in event {i} must be numeric.") + if not isinstance(evt['corr_id'], str): + raise InputValidationError(f"corr_id in event {i} must be a string.") + + +def generate_summary(raw_events: List[Dict[str, Any]]) -> Dict[str, float]: + """Analysiert Roh-Event-Daten und generiert eine strukturierte Run-Summary.""" + _validate_raw_events(raw_events) + + df = pd.DataFrame(raw_events) + if df.empty: + return SummaryData(0.0, 0.0) + + metric_values = df['metric_value'].dropna().tolist() + if not metric_values: + mischfenster_p95 = 0.0 + else: + try: + mischfenster_p95 = quantiles(metric_values, n=100)[94] + except Exception: + mischfenster_p95 = float(pd.Series(metric_values).quantile(0.95)) + + if 'retry' in df.columns: + total = len(df) + retry_free = (df['retry'] == False).sum() + retry_free_rate = retry_free / total if total > 0 else 0.0 + else: + # Fallback: simulate retry detection if corr_id duplicates exist + total = len(df['corr_id']) + unique_corr = df['corr_id'].nunique() + retry_free_rate = unique_corr / total if total > 0 else 0.0 + + summary = SummaryData(mischfenster_p95, retry_free_rate) + assert 0.0 <= summary['retry_free_in_window_rate'] <= 1.0, "Invalid retry_free rate" + return summary + + +def make_gate_decision(summary: Dict[str, float]) -> bool: + """Trifft eine Gate-Entscheidung basierend auf der Run-Summary.""" + if not isinstance(summary, dict): + raise InputValidationError("summary must be a dictionary.") + if 'mischfenster_p95' not in summary or 'retry_free_in_window_rate' not in summary: + raise InputValidationError("summary missing required keys.") + + mischfenster_p95 = summary.get('mischfenster_p95', 0.0) + retry_rate = summary.get('retry_free_in_window_rate', 0.0) + + # Regel v0: gute Runs haben p95 < 1.2 und retry_free_rate > 0.9 + decision = mischfenster_p95 < 1.2 and retry_rate > 0.9 + return bool(decision) + + +def export_debug_artifact(raw_events: List[Dict[str, Any]], output_path: str) -> str: + """Erzeugt ein JSON-Debug-File mit den Top-N auffälligsten Mischfenstern.""" + _validate_raw_events(raw_events) + + df = pd.DataFrame(raw_events) + if df.empty: + top_df = pd.DataFrame() + else: + top_df = df.sort_values('metric_value', ascending=False).head(10) + + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + top_records = top_df.to_dict(orient='records') + + with output_file.open('w', encoding='utf-8') as f: + json.dump(top_records, f, indent=2, ensure_ascii=False) + + assert output_file.exists(), "Debug file not created" + return str(output_file) \ No newline at end of file