commit 7854afa6141ee64f674bf0c18093ea59fb20745c Author: Mika Date: Thu Jan 29 16:23:27 2026 +0000 Add drift_analysis/src/drift_analysis/core.py diff --git a/drift_analysis/src/drift_analysis/core.py b/drift_analysis/src/drift_analysis/core.py new file mode 100644 index 0000000..1a8dfe1 --- /dev/null +++ b/drift_analysis/src/drift_analysis/core.py @@ -0,0 +1,108 @@ +import argparse +import json +import os +from pathlib import Path +from dataclasses import dataclass +from typing import List, Tuple, Dict + + +@dataclass +class FrozenRun: + """Datenmodell für einen einzelnen Frozen-Run.""" + run_id: str + status: str + is_pinned: bool + + def __post_init__(self): + if not isinstance(self.run_id, str): + raise ValueError("run_id muss ein String sein") + if not isinstance(self.status, str): + raise ValueError("status muss ein String sein") + if not isinstance(self.is_pinned, bool): + raise ValueError("is_pinned muss ein bool sein") + + +def load_frozen_runs(path: str) -> List[FrozenRun]: + """Lädt FrozenRun-Daten aus einer JSON-Datei und validiert sie.""" + file_path = Path(path) + if not file_path.exists() or not file_path.is_file(): + raise FileNotFoundError(f"Datei nicht gefunden: {path}") + + try: + with file_path.open('r', encoding='utf-8') as f: + data = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Ungültiges JSON-Format in {path}: {e}") from e + + if not isinstance(data, list): + raise ValueError("Eingabedatei muss eine Liste von Objekten enthalten") + + runs = [] + for entry in data: + try: + run = FrozenRun( + run_id=entry["run_id"], + status=entry["status"], + is_pinned=entry["is_pinned"] + ) + runs.append(run) + except KeyError as e: + raise ValueError(f"Fehlendes Feld in Eintrag: {e}") from e + except ValueError as e: + raise ValueError(f"Ungültiger Eintrag: {e}") from e + + return runs + + +def calculate_warn_rate(frozen_runs: List[FrozenRun], threshold: float) -> Tuple[int, int]: + """Berechnet die Anzahl der WARN-Ergebnisse und Gesamtanzahl.""" + if not isinstance(threshold, (float, int)): + raise ValueError("threshold muss eine Zahl sein") + + total_runs = len(frozen_runs) + warn_count = sum(1 for run in frozen_runs if run.status.upper() == "WARN") + + return warn_count, total_runs + + +def generate_report(warn_count: int, total_runs: int, threshold: float) -> Dict[str, object]: + """Erzeugt einen Bericht über Warnquote und Bewertung im JSON-Format.""" + if total_runs <= 0: + warn_rate = 0.0 + else: + warn_rate = warn_count / total_runs + + status = "OK" if warn_rate <= threshold else "WARN" + + return { + "warn_rate": round(warn_rate, 4), + "threshold": threshold, + "status": status, + } + + +def main(): + parser = argparse.ArgumentParser(description="Analyse der Warnquote bei Drift-Checks.") + parser.add_argument("--input", required=True, help="Pfad zur JSON-Datei mit Frozen-Runs") + parser.add_argument("--threshold", type=float, default=0.3, help="Warnschwelle für Drift-Erkennung") + parser.add_argument("--output", default="output/report.json", help="Ausgabedatei für den JSON-Report") + args = parser.parse_args() + + try: + frozen_runs = load_frozen_runs(args.input) + warn_count, total_runs = calculate_warn_rate(frozen_runs, args.threshold) + report = generate_report(warn_count, total_runs, args.threshold) + + os.makedirs(os.path.dirname(args.output), exist_ok=True) + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(report, f, indent=2) + + print(json.dumps(report, indent=2)) + + except Exception as e: + print(f"Fehler: {e}") + exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file