From bae262abb7982651f72676ac4b0c254c05b0f27c Mon Sep 17 00:00:00 2001 From: Mika Date: Sat, 14 Feb 2026 15:32:01 +0000 Subject: [PATCH] Add drift_detector/src/drift_detector/core.py --- drift_detector/src/drift_detector/core.py | 76 +++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 drift_detector/src/drift_detector/core.py diff --git a/drift_detector/src/drift_detector/core.py b/drift_detector/src/drift_detector/core.py new file mode 100644 index 0000000..061ab70 --- /dev/null +++ b/drift_detector/src/drift_detector/core.py @@ -0,0 +1,76 @@ +import json +import re +from pathlib import Path +from typing import Any, Dict, List + + +class DriftSignature: + """Repräsentiert eine normalisierte Drift-Signatur.""" + + def __init__(self, normalized_path: str, original_path: str, is_drift: bool) -> None: + assert isinstance(normalized_path, str), "normalized_path muss ein String sein" + assert isinstance(original_path, str), "original_path muss ein String sein" + assert isinstance(is_drift, bool), "is_drift muss ein bool sein" + self.normalized_path = normalized_path + self.original_path = original_path + self.is_drift = is_drift + + def to_dict(self) -> Dict[str, Any]: + """Gibt die Signatur als Dictionary zurück.""" + return { + "normalized_path": self.normalized_path, + "original_path": self.original_path, + "is_drift": self.is_drift, + } + + +def _normalize_path(path_str: str) -> str: + """Normalisiert Pfade durch Entfernen/Ersetzen typischer Driftmuster.""" + path_str = path_str.strip() + path_str = re.sub(r"/+", "/", path_str) # Doppelslashes normalisieren + path_str = re.sub(r"(?i)(version_|v)[0-9]+", "version", path_str) # Versionsnummern neutralisieren + path_str = re.sub(r"(?i)_?temp_?[0-9]*", "temp", path_str) # temporäre Pattern + path_str = re.sub(r"(?i)date=[0-9]{8}", "date=YYYYMMDD", path_str) # Datumsnormalisierung + return path_str + + +def detect_drift(path: str) -> bool: + """Erkennt Pfad-Drift aus einer JSONL- oder CSV-Datei. + + Args: + path (str): Pfad zur Logdatei mit Key-Einträgen. + + Returns: + bool: True, falls Drift erkannt wurde, sonst False. + """ + input_path = Path(path) + if not input_path.exists() or not input_path.is_file(): + raise FileNotFoundError(f"Datei nicht gefunden: {path}") + + signatures: List[DriftSignature] = [] + + with input_path.open("r", encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + try: + entry = json.loads(line) + except json.JSONDecodeError as e: + raise ValueError(f"Ungültiger JSON-Eintrag: {e}") from e + + if "original_path" not in entry: + raise KeyError("Fehlender Schlüssel 'original_path' im JSON-Eintrag") + + original_path = str(entry["original_path"]) + normalized_path = _normalize_path(original_path) + is_drift = normalized_path != original_path + signatures.append(DriftSignature(normalized_path, original_path, is_drift)) + + # Aggregation: wenn irgendein Eintrag Drift hat, gilt Gesamtfile als drifted + any_drift = any(sig.is_drift for sig in signatures) + + report = [sig.to_dict() for sig in signatures] + with Path("output/drift_report.json").open("w", encoding="utf-8") as out_f: + json.dump(report, out_f, indent=2, ensure_ascii=False) + + return any_drift