Add policy_eval_script/src/policy_eval_script/core.py

This commit is contained in:
Mika 2026-02-06 15:31:48 +00:00
commit e187b0b3b0

View file

@ -0,0 +1,87 @@
import json
import hashlib
from pathlib import Path
from typing import Dict, Any
class EvaluationResult(dict):
"""Container für das Evaluationsergebnis mit festen Feldern."""
REQUIRED_FIELDS = {"stratum", "decision", "reason", "policy_hash"}
def __init__(self, stratum: str, decision: str, reason: str, policy_hash: str):
super().__init__(
stratum=stratum,
decision=decision,
reason=reason,
policy_hash=policy_hash,
)
self._validate()
def _validate(self) -> None:
assert set(self.keys()) == self.REQUIRED_FIELDS, (
f"EvaluationResult muss Felder {self.REQUIRED_FIELDS} enthalten"
)
for k, v in self.items():
if not isinstance(v, str):
raise TypeError(f"Feld {k} muss ein String sein, erhielt {type(v)}")
def _compute_policy_hash(policy_constants: Dict[str, Any]) -> str:
"""Berechnet einen Hash über den Inhalt der Policy-Konstanten."""
encoded = json.dumps(policy_constants, sort_keys=True).encode()
return hashlib.sha256(encoded).hexdigest()
def _validate_drift_report(drift_report: Dict[str, Any]) -> None:
if not isinstance(drift_report, dict):
raise TypeError("drift_report muss ein Dictionary sein")
if not drift_report:
raise ValueError("drift_report darf nicht leer sein")
def evaluate_policy(drift_report: Dict[str, Any]) -> Dict[str, Any]:
"""Berechnet ein Policy-Evaluationsergebnis für einen gegebenen Drift-Report."""
_validate_drift_report(drift_report)
constants_path = Path("policy_constants.json")
if not constants_path.exists():
raise FileNotFoundError("policy_constants.json nicht gefunden")
with constants_path.open("r", encoding="utf-8") as f:
policy_constants = json.load(f)
policy_hash = _compute_policy_hash(policy_constants)
results = []
thresholds = policy_constants.get("thresholds", {})
default_threshold = thresholds.get("default", 0.1)
for stratum, metrics in drift_report.items():
if not isinstance(metrics, dict):
raise TypeError(f"Metrics für {stratum} muss ein Dictionary sein")
max_drift = max(metrics.values()) if metrics else 0.0
threshold = thresholds.get(stratum, default_threshold)
if max_drift > 2 * threshold:
decision = "FAIL"
reason = f"Drift {max_drift:.3f} über doppeltem Grenzwert {threshold:.3f}"
elif max_drift > threshold:
decision = "WARN"
reason = f"Drift {max_drift:.3f} über Grenzwert {threshold:.3f}"
else:
decision = "PASS"
reason = f"Drift {max_drift:.3f} innerhalb Grenzwert {threshold:.3f}"
result = EvaluationResult(
stratum=stratum,
decision=decision,
reason=reason,
policy_hash=policy_hash,
)
results.append(result)
# Wenn nur ein Stratum, gebe das eine Dict zurück, sonst Kopie für alle.
if len(results) == 1:
return results[0]
return {r["stratum"]: dict(r) for r in results}