commit 626a8abff881ac6b96f849da5621a6cd37b9488e Author: Mika Date: Mon Feb 9 16:01:10 2026 +0000 Add policy_eval.py/src/policy_eval/core.py diff --git a/policy_eval.py/src/policy_eval/core.py b/policy_eval.py/src/policy_eval/core.py new file mode 100644 index 0000000..fd7ed3f --- /dev/null +++ b/policy_eval.py/src/policy_eval/core.py @@ -0,0 +1,118 @@ +from __future__ import annotations +import os +import json +import hashlib +import logging +import csv +from pathlib import Path +from datetime import datetime +from typing import Dict, Any +import pandas as pd + + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s') + + +class PolicyChangeError(Exception): + """Custom exception raised for invalid policy constants input.""" + + +def _validate_policy_constants(policy_constants_json: Dict[str, Any]) -> None: + required_keys = {"version", "constant_value"} + if not isinstance(policy_constants_json, dict): + raise PolicyChangeError("policy_constants_json must be a dict") + missing = required_keys - policy_constants_json.keys() + if missing: + raise PolicyChangeError(f"Missing required keys in policy_constants_json: {missing}") + if not isinstance(policy_constants_json["version"], str): + raise PolicyChangeError("version must be a string") + if not isinstance(policy_constants_json["constant_value"], (int, float)): + raise PolicyChangeError("constant_value must be numeric") + + +def _compute_policy_hash(policy_constants_json: Dict[str, Any]) -> str: + serialized = json.dumps(policy_constants_json, sort_keys=True).encode('utf-8') + return hashlib.sha256(serialized).hexdigest() + + +def check_policy_changes(policy_constants_json: Dict[str, Any]) -> bool: + """Überprüft die Policy-Constants-Datei und erkennt Änderungen anhand von Hashes und Versionen.""" + _validate_policy_constants(policy_constants_json) + current_hash = _compute_policy_hash(policy_constants_json) + hash_record_path = Path("output/.last_policy_hash") + + if hash_record_path.exists(): + last_hash = hash_record_path.read_text().strip() + has_changed = last_hash != current_hash + else: + has_changed = True + + hash_record_path.parent.mkdir(parents=True, exist_ok=True) + hash_record_path.write_text(current_hash) + + logger.info("Policy change detected: %s", has_changed) + return has_changed + + +def run_backtest(audit_set: str) -> Dict[str, str]: + """Führt einen Backtest gegen das fixierte Audit-Set durch und erzeugt Delta-Artefakte.""" + audit_path = Path(audit_set) + if not audit_path.exists(): + raise FileNotFoundError(f"Audit set path not found: {audit_set}") + + # Idee: Vergleiche alte/neue Files (simuliert) → generiere delta_summary.json und delta_cases.csv + all_csvs = list(audit_path.glob('*.csv')) + if not all_csvs: + raise FileNotFoundError(f"No CSV audit files found in {audit_set}") + + df_list = [] + for csv_file in all_csvs: + df = pd.read_csv(csv_file) + df['source_file'] = csv_file.name + df_list.append(df) + full_df = pd.concat(df_list, ignore_index=True) + + # Simulierte Delta-Generierung: alte vs neue Policy (hier zufälliger Vergleich) + full_df['reason'] = 'Policy-Update' + full_df['old_decision'] = full_df.iloc[:, 0].astype(str) + full_df['new_decision'] = full_df.iloc[:, 0].astype(str) + + constants_path = Path('config/policy_constants.json') + policy_hash = 'unknown' + if constants_path.exists(): + with open(constants_path, 'r', encoding='utf-8') as f: + try: + constants_data = json.load(f) + _validate_policy_constants(constants_data) + policy_hash = _compute_policy_hash(constants_data) + except Exception as exc: + logger.warning("Could not compute policy hash: %s", exc) + + full_df['policy_hash'] = policy_hash + + output_dir = Path('output') + output_dir.mkdir(parents=True, exist_ok=True) + + delta_cases_path = output_dir / 'delta_cases.csv' + full_df[['reason', 'old_decision', 'new_decision', 'policy_hash']].to_csv(delta_cases_path, index=False) + + # Zusammenfassung + delta_summary = { + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'total_cases': len(full_df), + 'unique_policy_hashes': [policy_hash], + 'reason_counts': {'Policy-Update': len(full_df)}, + } + delta_summary_path = output_dir / 'delta_summary.json' + with open(delta_summary_path, 'w', encoding='utf-8') as f: + json.dump(delta_summary, f, indent=2) + + # CI Assertion + assert delta_cases_path.exists() and delta_summary_path.exists(), "Backtest output files missing." + logger.info("Backtest completed: %s cases written.", len(full_df)) + + return { + 'delta_summary_path': str(delta_summary_path), + 'delta_cases_path': str(delta_cases_path), + }