Add dry_run_mode/src/dry_run_mode/core.py

This commit is contained in:
Mika 2026-02-06 15:31:50 +00:00
parent 1bb54018c5
commit b1d3675653

View file

@ -0,0 +1,94 @@
import json
import hashlib
import os
from typing import Dict, Any, List, TypedDict
class DryRunResult(TypedDict):
stratum: str
dry_decision: str
reason: str
def _validate_drift_report(drift_report: Dict[str, Any]) -> None:
if not isinstance(drift_report, dict):
raise ValueError("drift_report must be a dict")
if "strata" not in drift_report:
raise ValueError("drift_report missing 'strata' key")
if not isinstance(drift_report["strata"], list):
raise ValueError("'strata' must be a list in drift_report")
def _load_policy_constants(path: str) -> Dict[str, Any]:
if not os.path.exists(path):
raise FileNotFoundError(f"Policy constants file not found: {path}")
with open(path, "r", encoding="utf-8") as f:
try:
constants = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in policy constants: {e}")
if not all(k in constants for k in ("alpha", "offset")):
raise ValueError("Policy constants file must contain 'alpha' and 'offset'")
return constants
def _compute_policy_hash(constants: Dict[str, Any]) -> str:
constants_str = json.dumps(constants, sort_keys=True)
return hashlib.sha256(constants_str.encode("utf-8")).hexdigest()
def dry_run_evaluation(drift_report: Dict[str, Any]) -> Dict[str, Any]:
"""Führt eine Dry-Run-Evaluierung des Drift-Reports durch und gibt Entscheidungen mit Begründung und Policy-Hash zurück."""
_validate_drift_report(drift_report)
constants_path = drift_report.get("policy_constants_path", "config/policy_constants.json")
constants = _load_policy_constants(constants_path)
policy_hash = _compute_policy_hash(constants)
results: List[DryRunResult] = []
alpha = constants["alpha"]
offset = constants["offset"]
for stratum_info in drift_report["strata"]:
stratum_id = stratum_info.get("name") or "unknown"
metric_value = stratum_info.get("metric", 0.0)
if not isinstance(metric_value, (int, float)):
results.append(
DryRunResult(
stratum=stratum_id,
dry_decision="FAIL",
reason="Invalid metric type"
)
)
continue
if metric_value < alpha - offset:
decision = "PASS"
reason = f"Metric {metric_value} < threshold {alpha - offset}"
elif metric_value < alpha + offset:
decision = "WARN"
reason = f"Metric {metric_value} near threshold (±{offset})"
else:
decision = "FAIL"
reason = f"Metric {metric_value} exceeds threshold {alpha + offset}"
results.append(
DryRunResult(
stratum=stratum_id,
dry_decision=decision,
reason=reason
)
)
output: Dict[str, Any] = {
"policy_hash": policy_hash,
"evaluations": results,
}
assert isinstance(output, dict), "Dry run output must be a dict"
assert all(set(r.keys()) == {"stratum", "dry_decision", "reason"} for r in results), (
"Each result must contain required fields"
)
return output