Add drift_detector/src/drift_detector/core.py

This commit is contained in:
Mika 2026-02-14 15:32:01 +00:00
parent 096dc6daba
commit bae262abb7

View file

@ -0,0 +1,76 @@
import json
import re
from pathlib import Path
from typing import Any, Dict, List
class DriftSignature:
"""Repräsentiert eine normalisierte Drift-Signatur."""
def __init__(self, normalized_path: str, original_path: str, is_drift: bool) -> None:
assert isinstance(normalized_path, str), "normalized_path muss ein String sein"
assert isinstance(original_path, str), "original_path muss ein String sein"
assert isinstance(is_drift, bool), "is_drift muss ein bool sein"
self.normalized_path = normalized_path
self.original_path = original_path
self.is_drift = is_drift
def to_dict(self) -> Dict[str, Any]:
"""Gibt die Signatur als Dictionary zurück."""
return {
"normalized_path": self.normalized_path,
"original_path": self.original_path,
"is_drift": self.is_drift,
}
def _normalize_path(path_str: str) -> str:
"""Normalisiert Pfade durch Entfernen/Ersetzen typischer Driftmuster."""
path_str = path_str.strip()
path_str = re.sub(r"/+", "/", path_str) # Doppelslashes normalisieren
path_str = re.sub(r"(?i)(version_|v)[0-9]+", "version", path_str) # Versionsnummern neutralisieren
path_str = re.sub(r"(?i)_?temp_?[0-9]*", "temp", path_str) # temporäre Pattern
path_str = re.sub(r"(?i)date=[0-9]{8}", "date=YYYYMMDD", path_str) # Datumsnormalisierung
return path_str
def detect_drift(path: str) -> bool:
"""Erkennt Pfad-Drift aus einer JSONL- oder CSV-Datei.
Args:
path (str): Pfad zur Logdatei mit Key-Einträgen.
Returns:
bool: True, falls Drift erkannt wurde, sonst False.
"""
input_path = Path(path)
if not input_path.exists() or not input_path.is_file():
raise FileNotFoundError(f"Datei nicht gefunden: {path}")
signatures: List[DriftSignature] = []
with input_path.open("r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError as e:
raise ValueError(f"Ungültiger JSON-Eintrag: {e}") from e
if "original_path" not in entry:
raise KeyError("Fehlender Schlüssel 'original_path' im JSON-Eintrag")
original_path = str(entry["original_path"])
normalized_path = _normalize_path(original_path)
is_drift = normalized_path != original_path
signatures.append(DriftSignature(normalized_path, original_path, is_drift))
# Aggregation: wenn irgendein Eintrag Drift hat, gilt Gesamtfile als drifted
any_drift = any(sig.is_drift for sig in signatures)
report = [sig.to_dict() for sig in signatures]
with Path("output/drift_report.json").open("w", encoding="utf-8") as out_f:
json.dump(report, out_f, indent=2, ensure_ascii=False)
return any_drift