From b1d3675653fb4909da5c88a46808e3641ea674d9 Mon Sep 17 00:00:00 2001 From: Mika Date: Fri, 6 Feb 2026 15:31:50 +0000 Subject: [PATCH] Add dry_run_mode/src/dry_run_mode/core.py --- dry_run_mode/src/dry_run_mode/core.py | 94 +++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 dry_run_mode/src/dry_run_mode/core.py diff --git a/dry_run_mode/src/dry_run_mode/core.py b/dry_run_mode/src/dry_run_mode/core.py new file mode 100644 index 0000000..0a2988e --- /dev/null +++ b/dry_run_mode/src/dry_run_mode/core.py @@ -0,0 +1,94 @@ +import json +import hashlib +import os +from typing import Dict, Any, List, TypedDict + + +class DryRunResult(TypedDict): + stratum: str + dry_decision: str + reason: str + + +def _validate_drift_report(drift_report: Dict[str, Any]) -> None: + if not isinstance(drift_report, dict): + raise ValueError("drift_report must be a dict") + if "strata" not in drift_report: + raise ValueError("drift_report missing 'strata' key") + if not isinstance(drift_report["strata"], list): + raise ValueError("'strata' must be a list in drift_report") + + +def _load_policy_constants(path: str) -> Dict[str, Any]: + if not os.path.exists(path): + raise FileNotFoundError(f"Policy constants file not found: {path}") + with open(path, "r", encoding="utf-8") as f: + try: + constants = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in policy constants: {e}") + if not all(k in constants for k in ("alpha", "offset")): + raise ValueError("Policy constants file must contain 'alpha' and 'offset'") + return constants + + +def _compute_policy_hash(constants: Dict[str, Any]) -> str: + constants_str = json.dumps(constants, sort_keys=True) + return hashlib.sha256(constants_str.encode("utf-8")).hexdigest() + + +def dry_run_evaluation(drift_report: Dict[str, Any]) -> Dict[str, Any]: + """Führt eine Dry-Run-Evaluierung des Drift-Reports durch und gibt Entscheidungen mit Begründung und Policy-Hash zurück.""" + _validate_drift_report(drift_report) + + constants_path = drift_report.get("policy_constants_path", "config/policy_constants.json") + constants = _load_policy_constants(constants_path) + policy_hash = _compute_policy_hash(constants) + + results: List[DryRunResult] = [] + alpha = constants["alpha"] + offset = constants["offset"] + + for stratum_info in drift_report["strata"]: + stratum_id = stratum_info.get("name") or "unknown" + metric_value = stratum_info.get("metric", 0.0) + + if not isinstance(metric_value, (int, float)): + results.append( + DryRunResult( + stratum=stratum_id, + dry_decision="FAIL", + reason="Invalid metric type" + ) + ) + continue + + if metric_value < alpha - offset: + decision = "PASS" + reason = f"Metric {metric_value} < threshold {alpha - offset}" + elif metric_value < alpha + offset: + decision = "WARN" + reason = f"Metric {metric_value} near threshold (±{offset})" + else: + decision = "FAIL" + reason = f"Metric {metric_value} exceeds threshold {alpha + offset}" + + results.append( + DryRunResult( + stratum=stratum_id, + dry_decision=decision, + reason=reason + ) + ) + + output: Dict[str, Any] = { + "policy_hash": policy_hash, + "evaluations": results, + } + + assert isinstance(output, dict), "Dry run output must be a dict" + assert all(set(r.keys()) == {"stratum", "dry_decision", "reason"} for r in results), ( + "Each result must contain required fields" + ) + + return output