Add drift_report_parser/src/drift_report_parser/core.py

This commit is contained in:
Mika 2026-01-31 13:07:38 +00:00
commit bcb578b3e2

View file

@ -0,0 +1,91 @@
from __future__ import annotations
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Any
from datetime import datetime
@dataclass
class DriftReportData:
"""Strukturierte Repräsentation eines einzelnen Drift-Report-Eintrags."""
timestamp: datetime
pinned: bool
unpinned: bool
decision: str
rolling_warn_rate: float
counts: str
label: str
comment: str | None = None
class DriftReportParseError(Exception):
"""Wird ausgelöst, wenn ein Drift-Report fehlerhaft ist oder nicht eingelesen werden kann."""
pass
def _validate_file_path(file_path: str | Path) -> Path:
path = Path(file_path)
if not path.exists():
raise DriftReportParseError(f"Drift report file not found: {path}")
if not path.is_file():
raise DriftReportParseError(f"Path is not a file: {path}")
return path
def _safe_get(data: Dict[str, Any], key: str, default: Any = None) -> Any:
return data.get(key, default)
def _parse_timestamp(value: Any) -> datetime:
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value)
except ValueError:
pass
return datetime.min
def parse_drift_report(file_path: str) -> List[Dict[str, Any]]:
"""Liest einen Drift-Report im JSON-Format und extrahiert relevante Felder zu Drift-Signalen.
Args:
file_path (str): Pfad zur Eingabedatei drift_report.json.
Returns:
list[dict]: Liste strukturierter DriftReportData-Dictionaries.
"""
path = _validate_file_path(file_path)
try:
with path.open('r', encoding='utf-8') as f:
raw_data = json.load(f)
except json.JSONDecodeError as e:
raise DriftReportParseError(f"JSON decoding failed: {e}") from e
if not isinstance(raw_data, list):
raise DriftReportParseError("Expected a list of drift report entries.")
parsed_entries: List[Dict[str, Any]] = []
for entry in raw_data:
if not isinstance(entry, dict):
continue
data = DriftReportData(
timestamp=_parse_timestamp(_safe_get(entry, 'timestamp')),
pinned=bool(_safe_get(entry, 'pinned', False)),
unpinned=bool(_safe_get(entry, 'unpinned', False)),
decision=str(_safe_get(entry, 'decision', 'UNKNOWN')),
rolling_warn_rate=float(_safe_get(entry, 'rolling_warn_rate', 0.0)),
counts=str(_safe_get(entry, 'counts', '')),
label=str(_safe_get(entry, 'label', '')),
comment=_safe_get(entry, 'comment'),
)
parsed_entries.append(asdict(data))
assert all(isinstance(e, dict) for e in parsed_entries), "Output validation failed"
return parsed_entries