diff --git a/frozen_runs_analysis/src/frozen_runs_analysis/core.py b/frozen_runs_analysis/src/frozen_runs_analysis/core.py new file mode 100644 index 0000000..543b74b --- /dev/null +++ b/frozen_runs_analysis/src/frozen_runs_analysis/core.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass, asdict +from typing import Dict, Any, List +import pandas as pd +from statistics import mean, pstdev, quantiles + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class RunDataValidationError(Exception): + """Raised when RunData validation fails.""" + pass + + +@dataclass +class RunData: + run_id: str + status: str + sanity_checks: Dict[str, Any] + config_hash: str + + @classmethod + def validate(cls, data: Dict[str, Any]) -> 'RunData': + required_fields = {"run_id", "status", "sanity_checks", "config_hash"} + missing = required_fields - data.keys() + if missing: + raise RunDataValidationError(f"Missing fields in RunData: {', '.join(missing)}") + if not isinstance(data.get("sanity_checks"), dict): + raise RunDataValidationError("Field 'sanity_checks' must be a dict.") + if not isinstance(data.get("run_id"), str): + raise RunDataValidationError("Field 'run_id' must be a str.") + if not isinstance(data.get("status"), str): + raise RunDataValidationError("Field 'status' must be a str.") + if not isinstance(data.get("config_hash"), str): + raise RunDataValidationError("Field 'config_hash' must be a str.") + return cls( + run_id=data["run_id"], + status=data["status"], + sanity_checks=data["sanity_checks"], + config_hash=data["config_hash"] + ) + + +def analyse_frozen_runs(data: List[RunData]) -> Dict[str, Any]: + """Analysiert Frozen-Run-Daten, berechnet Kennzahlen zur Stabilität und Häufigkeiten.""" + # Validierung + if not data: + logger.warning("Keine Daten zur Analyse übergeben.") + return {} + + valid_runs: List[RunData] = [] + for item in data: + if isinstance(item, dict): + try: + valid_runs.append(RunData.validate(item)) + except RunDataValidationError as e: + logger.error(f"Ungültiger Dateneintrag ignoriert: {e}") + elif isinstance(item, RunData): + valid_runs.append(item) + else: + logger.error(f"Unbekannter Datentyp in Analyse: {type(item).__name__}") + + if not valid_runs: + logger.warning("Keine validen Läufe nach Validierung.") + return {} + + df = pd.DataFrame([asdict(r) for r in valid_runs]) + + # Einfacher Sanity-Score: Anzahl Fehler in sanity_checks + df['sanity_error_count'] = df['sanity_checks'].apply(lambda d: sum(1 for v in d.values() if v not in (None, True, False) and not v)) + + # Flip-Flop-Frequenz per config_hash und status + hash_status = df.groupby('config_hash')['status'].nunique() + flip_flop_rate = (hash_status > 1).mean() + + # Statistische Verteilungen + sanity_counts = df['sanity_error_count'].tolist() + avg_errors = mean(sanity_counts) + std_errors = pstdev(sanity_counts) if len(sanity_counts) > 1 else 0.0 + p95 = quantiles(sanity_counts, n=100)[94] if len(sanity_counts) >= 20 else max(sanity_counts) + + result = { + "total_runs": len(df), + "unique_hashes": df['config_hash'].nunique(), + "flip_flop_rate": round(float(flip_flop_rate), 4), + "avg_sanity_errors": round(avg_errors, 4), + "std_sanity_errors": round(std_errors, 4), + "p95_sanity_errors": round(float(p95), 4), + } + + logger.info(f"Analyse abgeschlossen: {result}") + return result