Add policy_eval.py/src/policy_eval/core.py

This commit is contained in:
Mika 2026-02-09 16:01:10 +00:00
commit 626a8abff8

View file

@ -0,0 +1,118 @@
from __future__ import annotations
import os
import json
import hashlib
import logging
import csv
from pathlib import Path
from datetime import datetime
from typing import Dict, Any
import pandas as pd
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
class PolicyChangeError(Exception):
"""Custom exception raised for invalid policy constants input."""
def _validate_policy_constants(policy_constants_json: Dict[str, Any]) -> None:
required_keys = {"version", "constant_value"}
if not isinstance(policy_constants_json, dict):
raise PolicyChangeError("policy_constants_json must be a dict")
missing = required_keys - policy_constants_json.keys()
if missing:
raise PolicyChangeError(f"Missing required keys in policy_constants_json: {missing}")
if not isinstance(policy_constants_json["version"], str):
raise PolicyChangeError("version must be a string")
if not isinstance(policy_constants_json["constant_value"], (int, float)):
raise PolicyChangeError("constant_value must be numeric")
def _compute_policy_hash(policy_constants_json: Dict[str, Any]) -> str:
serialized = json.dumps(policy_constants_json, sort_keys=True).encode('utf-8')
return hashlib.sha256(serialized).hexdigest()
def check_policy_changes(policy_constants_json: Dict[str, Any]) -> bool:
"""Überprüft die Policy-Constants-Datei und erkennt Änderungen anhand von Hashes und Versionen."""
_validate_policy_constants(policy_constants_json)
current_hash = _compute_policy_hash(policy_constants_json)
hash_record_path = Path("output/.last_policy_hash")
if hash_record_path.exists():
last_hash = hash_record_path.read_text().strip()
has_changed = last_hash != current_hash
else:
has_changed = True
hash_record_path.parent.mkdir(parents=True, exist_ok=True)
hash_record_path.write_text(current_hash)
logger.info("Policy change detected: %s", has_changed)
return has_changed
def run_backtest(audit_set: str) -> Dict[str, str]:
"""Führt einen Backtest gegen das fixierte Audit-Set durch und erzeugt Delta-Artefakte."""
audit_path = Path(audit_set)
if not audit_path.exists():
raise FileNotFoundError(f"Audit set path not found: {audit_set}")
# Idee: Vergleiche alte/neue Files (simuliert) → generiere delta_summary.json und delta_cases.csv
all_csvs = list(audit_path.glob('*.csv'))
if not all_csvs:
raise FileNotFoundError(f"No CSV audit files found in {audit_set}")
df_list = []
for csv_file in all_csvs:
df = pd.read_csv(csv_file)
df['source_file'] = csv_file.name
df_list.append(df)
full_df = pd.concat(df_list, ignore_index=True)
# Simulierte Delta-Generierung: alte vs neue Policy (hier zufälliger Vergleich)
full_df['reason'] = 'Policy-Update'
full_df['old_decision'] = full_df.iloc[:, 0].astype(str)
full_df['new_decision'] = full_df.iloc[:, 0].astype(str)
constants_path = Path('config/policy_constants.json')
policy_hash = 'unknown'
if constants_path.exists():
with open(constants_path, 'r', encoding='utf-8') as f:
try:
constants_data = json.load(f)
_validate_policy_constants(constants_data)
policy_hash = _compute_policy_hash(constants_data)
except Exception as exc:
logger.warning("Could not compute policy hash: %s", exc)
full_df['policy_hash'] = policy_hash
output_dir = Path('output')
output_dir.mkdir(parents=True, exist_ok=True)
delta_cases_path = output_dir / 'delta_cases.csv'
full_df[['reason', 'old_decision', 'new_decision', 'policy_hash']].to_csv(delta_cases_path, index=False)
# Zusammenfassung
delta_summary = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'total_cases': len(full_df),
'unique_policy_hashes': [policy_hash],
'reason_counts': {'Policy-Update': len(full_df)},
}
delta_summary_path = output_dir / 'delta_summary.json'
with open(delta_summary_path, 'w', encoding='utf-8') as f:
json.dump(delta_summary, f, indent=2)
# CI Assertion
assert delta_cases_path.exists() and delta_summary_path.exists(), "Backtest output files missing."
logger.info("Backtest completed: %s cases written.", len(full_df))
return {
'delta_summary_path': str(delta_summary_path),
'delta_cases_path': str(delta_cases_path),
}