diff --git a/policy_evaluation/src/policy_evaluation/core.py b/policy_evaluation/src/policy_evaluation/core.py new file mode 100644 index 0000000..2d1e1f4 --- /dev/null +++ b/policy_evaluation/src/policy_evaluation/core.py @@ -0,0 +1,116 @@ +from __future__ import annotations +import json +import argparse +import logging +from pathlib import Path +from dataclasses import dataclass, asdict +from typing import List, Dict, Any +import numpy as np +import pandas as pd + +logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + + +@dataclass +class LogEntry: + t_publish: float + t_gate_read: float + t_index_visible: float + pinned_flag: bool + timeouts: int + drift_signature: str + + @classmethod + def validate(cls, entry: Dict[str, Any]) -> 'LogEntry': + required = { + 't_publish': float, + 't_gate_read': float, + 't_index_visible': float, + 'pinned_flag': bool, + 'timeouts': int, + 'drift_signature': str, + } + for k, typ in required.items(): + if k not in entry: + raise ValueError(f"Missing field '{k}' in log entry") + if not isinstance(entry[k], typ): + # allow np types for numeric + if typ in (float, int) and isinstance(entry[k], (np.floating, np.integer)): + continue + raise TypeError(f"Field '{k}' must be of type {typ.__name__}, got {type(entry[k]).__name__}") + return cls(**{k: entry[k] for k in required}) + + +@dataclass +class PolicyResults: + p99_coverage: float + remaining_missing_cases: int + conversion_rates: float + max_wait_time: float + + def to_json(self) -> Dict[str, Any]: + return asdict(self) + + +def evaluate_policies(log_data: List[Dict[str, Any]]) -> PolicyResults: + if not isinstance(log_data, list): + raise TypeError("log_data must be a list of dictionaries") + validated_entries = [LogEntry.validate(entry) for entry in log_data] + if not validated_entries: + return PolicyResults(0.0, 0, 0.0, 0.0) + + df = pd.DataFrame([asdict(e) for e in validated_entries]) + df['wait_time'] = df['t_index_visible'] - df['t_publish'] + + try: + p99 = np.percentile(df['wait_time'], 99) + except IndexError: + p99 = 0.0 + + missing_cases = int((df['drift_signature'] != 'OK').sum()) + + # Conversion: approximate via success ratio (no timeouts) + conversions = len(df[df['timeouts'] == 0]) / max(len(df), 1) + + max_wait = float(df['wait_time'].max()) if not df.empty else 0.0 + + result = PolicyResults( + p99_coverage=float(p99), + remaining_missing_cases=missing_cases, + conversion_rates=float(conversions), + max_wait_time=max_wait, + ) + logger.info("Policy evaluation complete: %s", result) + return result + + +def _load_jsonl(path: Path) -> List[Dict[str, Any]]: + with path.open('r', encoding='utf-8') as f: + return [json.loads(line.strip()) for line in f if line.strip()] + + +def main(): + parser = argparse.ArgumentParser(description="Evaluate storage policies from log data.") + parser.add_argument('--input', required=True, help='Pfad zur JSONL-Logdatei mit Messdaten') + parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei mit Evaluierungsergebnissen') + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + if not input_path.exists(): + raise FileNotFoundError(f"Input file not found: {input_path}") + + log_data = _load_jsonl(input_path) + result = evaluate_policies(log_data) + + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open('w', encoding='utf-8') as f: + json.dump(result.to_json(), f, indent=2) + + logger.info("Results written to %s", output_path) + + +if __name__ == '__main__': + main() \ No newline at end of file