commit a760dc9964cf56664bf639c6316a0dbae54dc501 Author: Mika Date: Sun Feb 1 17:56:58 2026 +0000 Add rerun_analysis_tool/src/rerun_analysis_tool/core.py diff --git a/rerun_analysis_tool/src/rerun_analysis_tool/core.py b/rerun_analysis_tool/src/rerun_analysis_tool/core.py new file mode 100644 index 0000000..8015adb --- /dev/null +++ b/rerun_analysis_tool/src/rerun_analysis_tool/core.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import List, Dict, Any +import pandas as pd + + +logger = logging.getLogger(__name__) + + +@dataclass +class RunResult: + run_id: str + status: str + unknown_rate: float + rerun_helps: int + rerun_shifts: int + rerun_hurts: int + + +class DataValidationError(Exception): + """Raised when the input data does not meet validation requirements.""" + pass + + +def _validate_runs_data(runs_data: List[Dict[str, Any]]) -> None: + required_fields = {"run_id", "status", "unknown_rate", "rerun_helps", "rerun_shifts", "rerun_hurts"} + for idx, entry in enumerate(runs_data): + if not isinstance(entry, dict): + raise DataValidationError(f"Entry {idx} is not a dict: {entry}") + missing = required_fields - set(entry.keys()) + if missing: + raise DataValidationError(f"Entry {idx} missing fields: {missing}") + if not isinstance(entry["run_id"], str): + raise DataValidationError(f"run_id must be str, got {type(entry['run_id'])}") + if entry["status"] not in {"WARN", "PASS", "FAIL"}: + raise DataValidationError(f"Invalid status value: {entry['status']}") + for int_field in ["rerun_helps", "rerun_shifts", "rerun_hurts"]: + if not isinstance(entry[int_field], int): + raise DataValidationError(f"{int_field} must be int, got {type(entry[int_field])}") + if not isinstance(entry["unknown_rate"], (float, int)): + raise DataValidationError(f"unknown_rate must be float, got {type(entry['unknown_rate'])}") + + +def analyze_runs(runs_data: List[Dict[str, Any]], threshold: float, rerun_budget: int) -> Dict[str, Any]: + """ + Analysiert Replay-Daten zur Bestimmung der Auswirkungen von rerun_budget auf WARN-Verteilungen. + + Args: + runs_data: Liste von Run-Dictionaries mit Feldern wie in RunResult. + threshold: WARN-Schwelle (z. B. 0.3 für 30%). + rerun_budget: Anzahl erlaubter Wiederholungen pro Run. + + Returns: + Dictionary mit aggregierten Kennzahlen für rerun_helps, rerun_shifts und rerun_hurts. + """ + logger.debug("Starting analyze_runs with threshold=%s and rerun_budget=%s", threshold, rerun_budget) + assert isinstance(runs_data, list), "runs_data must be a list" + assert isinstance(threshold, float), "threshold must be float" + assert isinstance(rerun_budget, int), "rerun_budget must be int" + + # Validate input + _validate_runs_data(runs_data) + + if not runs_data: + logger.warning("Empty runs_data received - returning zeros.") + return {"rerun_helps": 0, "rerun_shifts": 0, "rerun_hurts": 0} + + # Convert to DataFrame for analysis + df = pd.DataFrame(runs_data) + + # Compute fractions and apply threshold logic + df_filtered = df[df["unknown_rate"] <= threshold].copy() + + rerun_helps_count = int((df_filtered["rerun_helps"] > 0).sum()) + rerun_shifts_count = int((df_filtered["rerun_shifts"] > 0).sum()) + rerun_hurts_count = int((df_filtered["rerun_hurts"] > 0).sum()) + + total = len(df_filtered) + logger.info( + "Rerun summary (filtered %s of %s): helps=%s, shifts=%s, hurts=%s", + total, + len(df), + rerun_helps_count, + rerun_shifts_count, + rerun_hurts_count, + ) + + result = { + "rerun_helps": rerun_helps_count, + "rerun_shifts": rerun_shifts_count, + "rerun_hurts": rerun_hurts_count, + "total_analyzed": total, + "threshold": threshold, + "rerun_budget": rerun_budget, + } + + assert all(isinstance(v, (int, float)) for v in result.values() if v is not None), "All numeric fields must be valid" + + return result