Add policy_eval_script/src/policy_eval_script/cli.py

This commit is contained in:
Mika 2026-02-06 15:31:49 +00:00
parent e187b0b3b0
commit 1a1e66701e

View file

@ -0,0 +1,77 @@
import argparse
import json
import hashlib
import os
from pathlib import Path
from typing import Any, Dict
from policy_eval_script.core import evaluate_policy
def _load_json_file(path: Path) -> Dict[str, Any]:
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"Datei nicht gefunden: {path}")
with path.open('r', encoding='utf-8') as f:
try:
return json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Ungültiges JSON in {path}: {e}") from e
def _save_json_file(path: Path, data: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _compute_policy_hash(constants: Dict[str, Any]) -> str:
serialized = json.dumps(constants, sort_keys=True)
return hashlib.sha256(serialized.encode('utf-8')).hexdigest()
def main() -> None:
parser = argparse.ArgumentParser(
description="Evaluates policy compliance based on drift report and policy constants."
)
parser.add_argument('--drift', required=True, help='Pfad zur Drift-Report-JSON-Datei.')
parser.add_argument('--constants', required=True, help='Pfad zu den Policy-Konstanten.')
parser.add_argument('--output', required=False, default='output/evaluation_result.json',
help='Pfad für das Ergebnis-JSON (Standard: output/evaluation_result.json).')
parser.add_argument('--dry-run', action='store_true', help='Nicht-blockierender Modus (FAIL wird nicht als Exit-Error gewertet).')
args = parser.parse_args()
drift_path = Path(args.drift)
constants_path = Path(args.constants)
output_path = Path(args.output)
drift_report = _load_json_file(drift_path)
policy_constants = _load_json_file(constants_path)
try:
result = evaluate_policy(drift_report)
except Exception as e:
print(f"Fehler bei Policy-Evaluierung: {e}")
exit(2)
policy_hash = _compute_policy_hash(policy_constants)
if isinstance(result, dict):
# Sicherstellen, dass alle Felder für ci_ready enthalten sind
result.setdefault('policy_hash', policy_hash)
else:
raise ValueError('evaluate_policy muss ein Dictionary zurückgeben.')
_save_json_file(output_path, result)
decision = result.get('decision', '').upper()
print(f"Policy Evaluation: {decision}")
if not args.dry_run:
if decision == 'FAIL':
print('Policy FAIL erkannt Abbruch.')
exit(1)
if __name__ == '__main__':
main()