Add frozen_runs_analysis/src/frozen_runs_analysis/core.py

This commit is contained in:
Mika 2026-01-26 12:23:44 +00:00
parent 9fab7ffa8c
commit c67cc23016

View file

@ -0,0 +1,96 @@
from __future__ import annotations
import logging
from dataclasses import dataclass, asdict
from typing import Dict, Any, List
import pandas as pd
from statistics import mean, pstdev, quantiles
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class RunDataValidationError(Exception):
"""Raised when RunData validation fails."""
pass
@dataclass
class RunData:
run_id: str
status: str
sanity_checks: Dict[str, Any]
config_hash: str
@classmethod
def validate(cls, data: Dict[str, Any]) -> 'RunData':
required_fields = {"run_id", "status", "sanity_checks", "config_hash"}
missing = required_fields - data.keys()
if missing:
raise RunDataValidationError(f"Missing fields in RunData: {', '.join(missing)}")
if not isinstance(data.get("sanity_checks"), dict):
raise RunDataValidationError("Field 'sanity_checks' must be a dict.")
if not isinstance(data.get("run_id"), str):
raise RunDataValidationError("Field 'run_id' must be a str.")
if not isinstance(data.get("status"), str):
raise RunDataValidationError("Field 'status' must be a str.")
if not isinstance(data.get("config_hash"), str):
raise RunDataValidationError("Field 'config_hash' must be a str.")
return cls(
run_id=data["run_id"],
status=data["status"],
sanity_checks=data["sanity_checks"],
config_hash=data["config_hash"]
)
def analyse_frozen_runs(data: List[RunData]) -> Dict[str, Any]:
"""Analysiert Frozen-Run-Daten, berechnet Kennzahlen zur Stabilität und Häufigkeiten."""
# Validierung
if not data:
logger.warning("Keine Daten zur Analyse übergeben.")
return {}
valid_runs: List[RunData] = []
for item in data:
if isinstance(item, dict):
try:
valid_runs.append(RunData.validate(item))
except RunDataValidationError as e:
logger.error(f"Ungültiger Dateneintrag ignoriert: {e}")
elif isinstance(item, RunData):
valid_runs.append(item)
else:
logger.error(f"Unbekannter Datentyp in Analyse: {type(item).__name__}")
if not valid_runs:
logger.warning("Keine validen Läufe nach Validierung.")
return {}
df = pd.DataFrame([asdict(r) for r in valid_runs])
# Einfacher Sanity-Score: Anzahl Fehler in sanity_checks
df['sanity_error_count'] = df['sanity_checks'].apply(lambda d: sum(1 for v in d.values() if v not in (None, True, False) and not v))
# Flip-Flop-Frequenz per config_hash und status
hash_status = df.groupby('config_hash')['status'].nunique()
flip_flop_rate = (hash_status > 1).mean()
# Statistische Verteilungen
sanity_counts = df['sanity_error_count'].tolist()
avg_errors = mean(sanity_counts)
std_errors = pstdev(sanity_counts) if len(sanity_counts) > 1 else 0.0
p95 = quantiles(sanity_counts, n=100)[94] if len(sanity_counts) >= 20 else max(sanity_counts)
result = {
"total_runs": len(df),
"unique_hashes": df['config_hash'].nunique(),
"flip_flop_rate": round(float(flip_flop_rate), 4),
"avg_sanity_errors": round(avg_errors, 4),
"std_sanity_errors": round(std_errors, 4),
"p95_sanity_errors": round(float(p95), 4),
}
logger.info(f"Analyse abgeschlossen: {result}")
return result