Add drift_analysis/src/drift_analysis/core.py

This commit is contained in:
Mika 2026-01-29 16:23:27 +00:00
commit 7854afa614

View file

@ -0,0 +1,108 @@
import argparse
import json
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple, Dict
@dataclass
class FrozenRun:
"""Datenmodell für einen einzelnen Frozen-Run."""
run_id: str
status: str
is_pinned: bool
def __post_init__(self):
if not isinstance(self.run_id, str):
raise ValueError("run_id muss ein String sein")
if not isinstance(self.status, str):
raise ValueError("status muss ein String sein")
if not isinstance(self.is_pinned, bool):
raise ValueError("is_pinned muss ein bool sein")
def load_frozen_runs(path: str) -> List[FrozenRun]:
"""Lädt FrozenRun-Daten aus einer JSON-Datei und validiert sie."""
file_path = Path(path)
if not file_path.exists() or not file_path.is_file():
raise FileNotFoundError(f"Datei nicht gefunden: {path}")
try:
with file_path.open('r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Ungültiges JSON-Format in {path}: {e}") from e
if not isinstance(data, list):
raise ValueError("Eingabedatei muss eine Liste von Objekten enthalten")
runs = []
for entry in data:
try:
run = FrozenRun(
run_id=entry["run_id"],
status=entry["status"],
is_pinned=entry["is_pinned"]
)
runs.append(run)
except KeyError as e:
raise ValueError(f"Fehlendes Feld in Eintrag: {e}") from e
except ValueError as e:
raise ValueError(f"Ungültiger Eintrag: {e}") from e
return runs
def calculate_warn_rate(frozen_runs: List[FrozenRun], threshold: float) -> Tuple[int, int]:
"""Berechnet die Anzahl der WARN-Ergebnisse und Gesamtanzahl."""
if not isinstance(threshold, (float, int)):
raise ValueError("threshold muss eine Zahl sein")
total_runs = len(frozen_runs)
warn_count = sum(1 for run in frozen_runs if run.status.upper() == "WARN")
return warn_count, total_runs
def generate_report(warn_count: int, total_runs: int, threshold: float) -> Dict[str, object]:
"""Erzeugt einen Bericht über Warnquote und Bewertung im JSON-Format."""
if total_runs <= 0:
warn_rate = 0.0
else:
warn_rate = warn_count / total_runs
status = "OK" if warn_rate <= threshold else "WARN"
return {
"warn_rate": round(warn_rate, 4),
"threshold": threshold,
"status": status,
}
def main():
parser = argparse.ArgumentParser(description="Analyse der Warnquote bei Drift-Checks.")
parser.add_argument("--input", required=True, help="Pfad zur JSON-Datei mit Frozen-Runs")
parser.add_argument("--threshold", type=float, default=0.3, help="Warnschwelle für Drift-Erkennung")
parser.add_argument("--output", default="output/report.json", help="Ausgabedatei für den JSON-Report")
args = parser.parse_args()
try:
frozen_runs = load_frozen_runs(args.input)
warn_count, total_runs = calculate_warn_rate(frozen_runs, args.threshold)
report = generate_report(warn_count, total_runs, args.threshold)
os.makedirs(os.path.dirname(args.output), exist_ok=True)
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2)
print(json.dumps(report, indent=2))
except Exception as e:
print(f"Fehler: {e}")
exit(1)
if __name__ == "__main__":
main()