From bcb578b3e2672a383fe63cf6539c63ed2a61418a Mon Sep 17 00:00:00 2001 From: Mika Date: Sat, 31 Jan 2026 13:07:38 +0000 Subject: [PATCH] Add drift_report_parser/src/drift_report_parser/core.py --- .../src/drift_report_parser/core.py | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 drift_report_parser/src/drift_report_parser/core.py diff --git a/drift_report_parser/src/drift_report_parser/core.py b/drift_report_parser/src/drift_report_parser/core.py new file mode 100644 index 0000000..f81e014 --- /dev/null +++ b/drift_report_parser/src/drift_report_parser/core.py @@ -0,0 +1,91 @@ +from __future__ import annotations +import json +from pathlib import Path +from dataclasses import dataclass, asdict +from typing import List, Dict, Any +from datetime import datetime + + +@dataclass +class DriftReportData: + """Strukturierte Repräsentation eines einzelnen Drift-Report-Eintrags.""" + timestamp: datetime + pinned: bool + unpinned: bool + decision: str + rolling_warn_rate: float + counts: str + label: str + comment: str | None = None + + +class DriftReportParseError(Exception): + """Wird ausgelöst, wenn ein Drift-Report fehlerhaft ist oder nicht eingelesen werden kann.""" + pass + + +def _validate_file_path(file_path: str | Path) -> Path: + path = Path(file_path) + if not path.exists(): + raise DriftReportParseError(f"Drift report file not found: {path}") + if not path.is_file(): + raise DriftReportParseError(f"Path is not a file: {path}") + return path + + +def _safe_get(data: Dict[str, Any], key: str, default: Any = None) -> Any: + return data.get(key, default) + + +def _parse_timestamp(value: Any) -> datetime: + if isinstance(value, datetime): + return value + if isinstance(value, str): + try: + return datetime.fromisoformat(value) + except ValueError: + pass + return datetime.min + + +def parse_drift_report(file_path: str) -> List[Dict[str, Any]]: + """Liest einen Drift-Report im JSON-Format und extrahiert relevante Felder zu Drift-Signalen. + + Args: + file_path (str): Pfad zur Eingabedatei drift_report.json. + + Returns: + list[dict]: Liste strukturierter DriftReportData-Dictionaries. + """ + path = _validate_file_path(file_path) + + try: + with path.open('r', encoding='utf-8') as f: + raw_data = json.load(f) + except json.JSONDecodeError as e: + raise DriftReportParseError(f"JSON decoding failed: {e}") from e + + if not isinstance(raw_data, list): + raise DriftReportParseError("Expected a list of drift report entries.") + + parsed_entries: List[Dict[str, Any]] = [] + + for entry in raw_data: + if not isinstance(entry, dict): + continue + + data = DriftReportData( + timestamp=_parse_timestamp(_safe_get(entry, 'timestamp')), + pinned=bool(_safe_get(entry, 'pinned', False)), + unpinned=bool(_safe_get(entry, 'unpinned', False)), + decision=str(_safe_get(entry, 'decision', 'UNKNOWN')), + rolling_warn_rate=float(_safe_get(entry, 'rolling_warn_rate', 0.0)), + counts=str(_safe_get(entry, 'counts', '')), + label=str(_safe_get(entry, 'label', '')), + comment=_safe_get(entry, 'comment'), + ) + parsed_entries.append(asdict(data)) + + assert all(isinstance(e, dict) for e in parsed_entries), "Output validation failed" + + return parsed_entries