Add rerun_analysis_tool/src/rerun_analysis_tool/core.py

This commit is contained in:
Mika 2026-02-01 17:56:58 +00:00
commit a760dc9964

View file

@ -0,0 +1,101 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd
logger = logging.getLogger(__name__)
@dataclass
class RunResult:
run_id: str
status: str
unknown_rate: float
rerun_helps: int
rerun_shifts: int
rerun_hurts: int
class DataValidationError(Exception):
"""Raised when the input data does not meet validation requirements."""
pass
def _validate_runs_data(runs_data: List[Dict[str, Any]]) -> None:
required_fields = {"run_id", "status", "unknown_rate", "rerun_helps", "rerun_shifts", "rerun_hurts"}
for idx, entry in enumerate(runs_data):
if not isinstance(entry, dict):
raise DataValidationError(f"Entry {idx} is not a dict: {entry}")
missing = required_fields - set(entry.keys())
if missing:
raise DataValidationError(f"Entry {idx} missing fields: {missing}")
if not isinstance(entry["run_id"], str):
raise DataValidationError(f"run_id must be str, got {type(entry['run_id'])}")
if entry["status"] not in {"WARN", "PASS", "FAIL"}:
raise DataValidationError(f"Invalid status value: {entry['status']}")
for int_field in ["rerun_helps", "rerun_shifts", "rerun_hurts"]:
if not isinstance(entry[int_field], int):
raise DataValidationError(f"{int_field} must be int, got {type(entry[int_field])}")
if not isinstance(entry["unknown_rate"], (float, int)):
raise DataValidationError(f"unknown_rate must be float, got {type(entry['unknown_rate'])}")
def analyze_runs(runs_data: List[Dict[str, Any]], threshold: float, rerun_budget: int) -> Dict[str, Any]:
"""
Analysiert Replay-Daten zur Bestimmung der Auswirkungen von rerun_budget auf WARN-Verteilungen.
Args:
runs_data: Liste von Run-Dictionaries mit Feldern wie in RunResult.
threshold: WARN-Schwelle (z. B. 0.3 für 30%).
rerun_budget: Anzahl erlaubter Wiederholungen pro Run.
Returns:
Dictionary mit aggregierten Kennzahlen für rerun_helps, rerun_shifts und rerun_hurts.
"""
logger.debug("Starting analyze_runs with threshold=%s and rerun_budget=%s", threshold, rerun_budget)
assert isinstance(runs_data, list), "runs_data must be a list"
assert isinstance(threshold, float), "threshold must be float"
assert isinstance(rerun_budget, int), "rerun_budget must be int"
# Validate input
_validate_runs_data(runs_data)
if not runs_data:
logger.warning("Empty runs_data received - returning zeros.")
return {"rerun_helps": 0, "rerun_shifts": 0, "rerun_hurts": 0}
# Convert to DataFrame for analysis
df = pd.DataFrame(runs_data)
# Compute fractions and apply threshold logic
df_filtered = df[df["unknown_rate"] <= threshold].copy()
rerun_helps_count = int((df_filtered["rerun_helps"] > 0).sum())
rerun_shifts_count = int((df_filtered["rerun_shifts"] > 0).sum())
rerun_hurts_count = int((df_filtered["rerun_hurts"] > 0).sum())
total = len(df_filtered)
logger.info(
"Rerun summary (filtered %s of %s): helps=%s, shifts=%s, hurts=%s",
total,
len(df),
rerun_helps_count,
rerun_shifts_count,
rerun_hurts_count,
)
result = {
"rerun_helps": rerun_helps_count,
"rerun_shifts": rerun_shifts_count,
"rerun_hurts": rerun_hurts_count,
"total_analyzed": total,
"threshold": threshold,
"rerun_budget": rerun_budget,
}
assert all(isinstance(v, (int, float)) for v in result.values() if v is not None), "All numeric fields must be valid"
return result