diff --git a/policy_eval_script/src/policy_eval_script/cli.py b/policy_eval_script/src/policy_eval_script/cli.py new file mode 100644 index 0000000..f85b257 --- /dev/null +++ b/policy_eval_script/src/policy_eval_script/cli.py @@ -0,0 +1,77 @@ +import argparse +import json +import hashlib +import os +from pathlib import Path +from typing import Any, Dict + +from policy_eval_script.core import evaluate_policy + + +def _load_json_file(path: Path) -> Dict[str, Any]: + if not path.exists() or not path.is_file(): + raise FileNotFoundError(f"Datei nicht gefunden: {path}") + with path.open('r', encoding='utf-8') as f: + try: + return json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Ungültiges JSON in {path}: {e}") from e + + +def _save_json_file(path: Path, data: Dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open('w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +def _compute_policy_hash(constants: Dict[str, Any]) -> str: + serialized = json.dumps(constants, sort_keys=True) + return hashlib.sha256(serialized.encode('utf-8')).hexdigest() + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Evaluates policy compliance based on drift report and policy constants." + ) + parser.add_argument('--drift', required=True, help='Pfad zur Drift-Report-JSON-Datei.') + parser.add_argument('--constants', required=True, help='Pfad zu den Policy-Konstanten.') + parser.add_argument('--output', required=False, default='output/evaluation_result.json', + help='Pfad für das Ergebnis-JSON (Standard: output/evaluation_result.json).') + parser.add_argument('--dry-run', action='store_true', help='Nicht-blockierender Modus (FAIL wird nicht als Exit-Error gewertet).') + + args = parser.parse_args() + + drift_path = Path(args.drift) + constants_path = Path(args.constants) + output_path = Path(args.output) + + drift_report = _load_json_file(drift_path) + policy_constants = _load_json_file(constants_path) + + try: + result = evaluate_policy(drift_report) + except Exception as e: + print(f"Fehler bei Policy-Evaluierung: {e}") + exit(2) + + policy_hash = _compute_policy_hash(policy_constants) + + if isinstance(result, dict): + # Sicherstellen, dass alle Felder für ci_ready enthalten sind + result.setdefault('policy_hash', policy_hash) + else: + raise ValueError('evaluate_policy muss ein Dictionary zurückgeben.') + + _save_json_file(output_path, result) + + decision = result.get('decision', '').upper() + print(f"Policy Evaluation: {decision}") + + if not args.dry_run: + if decision == 'FAIL': + print('Policy FAIL erkannt – Abbruch.') + exit(1) + + +if __name__ == '__main__': + main() \ No newline at end of file