From e187b0b3b0e89ce8f2e53c85a1613ea4c60ce3e5 Mon Sep 17 00:00:00 2001 From: Mika Date: Fri, 6 Feb 2026 15:31:48 +0000 Subject: [PATCH] Add policy_eval_script/src/policy_eval_script/core.py --- .../src/policy_eval_script/core.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 policy_eval_script/src/policy_eval_script/core.py diff --git a/policy_eval_script/src/policy_eval_script/core.py b/policy_eval_script/src/policy_eval_script/core.py new file mode 100644 index 0000000..584cbd4 --- /dev/null +++ b/policy_eval_script/src/policy_eval_script/core.py @@ -0,0 +1,87 @@ +import json +import hashlib +from pathlib import Path +from typing import Dict, Any + + +class EvaluationResult(dict): + """Container für das Evaluationsergebnis mit festen Feldern.""" + + REQUIRED_FIELDS = {"stratum", "decision", "reason", "policy_hash"} + + def __init__(self, stratum: str, decision: str, reason: str, policy_hash: str): + super().__init__( + stratum=stratum, + decision=decision, + reason=reason, + policy_hash=policy_hash, + ) + self._validate() + + def _validate(self) -> None: + assert set(self.keys()) == self.REQUIRED_FIELDS, ( + f"EvaluationResult muss Felder {self.REQUIRED_FIELDS} enthalten" + ) + for k, v in self.items(): + if not isinstance(v, str): + raise TypeError(f"Feld {k} muss ein String sein, erhielt {type(v)}") + + +def _compute_policy_hash(policy_constants: Dict[str, Any]) -> str: + """Berechnet einen Hash über den Inhalt der Policy-Konstanten.""" + encoded = json.dumps(policy_constants, sort_keys=True).encode() + return hashlib.sha256(encoded).hexdigest() + + +def _validate_drift_report(drift_report: Dict[str, Any]) -> None: + if not isinstance(drift_report, dict): + raise TypeError("drift_report muss ein Dictionary sein") + if not drift_report: + raise ValueError("drift_report darf nicht leer sein") + + +def evaluate_policy(drift_report: Dict[str, Any]) -> Dict[str, Any]: + """Berechnet ein Policy-Evaluationsergebnis für einen gegebenen Drift-Report.""" + _validate_drift_report(drift_report) + + constants_path = Path("policy_constants.json") + if not constants_path.exists(): + raise FileNotFoundError("policy_constants.json nicht gefunden") + + with constants_path.open("r", encoding="utf-8") as f: + policy_constants = json.load(f) + + policy_hash = _compute_policy_hash(policy_constants) + + results = [] + thresholds = policy_constants.get("thresholds", {}) + default_threshold = thresholds.get("default", 0.1) + + for stratum, metrics in drift_report.items(): + if not isinstance(metrics, dict): + raise TypeError(f"Metrics für {stratum} muss ein Dictionary sein") + max_drift = max(metrics.values()) if metrics else 0.0 + threshold = thresholds.get(stratum, default_threshold) + + if max_drift > 2 * threshold: + decision = "FAIL" + reason = f"Drift {max_drift:.3f} über doppeltem Grenzwert {threshold:.3f}" + elif max_drift > threshold: + decision = "WARN" + reason = f"Drift {max_drift:.3f} über Grenzwert {threshold:.3f}" + else: + decision = "PASS" + reason = f"Drift {max_drift:.3f} innerhalb Grenzwert {threshold:.3f}" + + result = EvaluationResult( + stratum=stratum, + decision=decision, + reason=reason, + policy_hash=policy_hash, + ) + results.append(result) + + # Wenn nur ein Stratum, gebe das eine Dict zurück, sonst Kopie für alle. + if len(results) == 1: + return results[0] + return {r["stratum"]: dict(r) for r in results}