Add policy_evaluation/src/policy_evaluation/core.py

This commit is contained in:
Mika 2026-02-16 15:27:10 +00:00
parent 78610b1f7a
commit 9c890b9304

View file

@ -0,0 +1,116 @@
from __future__ import annotations
import json
import argparse
import logging
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Any
import numpy as np
import pandas as pd
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class LogEntry:
t_publish: float
t_gate_read: float
t_index_visible: float
pinned_flag: bool
timeouts: int
drift_signature: str
@classmethod
def validate(cls, entry: Dict[str, Any]) -> 'LogEntry':
required = {
't_publish': float,
't_gate_read': float,
't_index_visible': float,
'pinned_flag': bool,
'timeouts': int,
'drift_signature': str,
}
for k, typ in required.items():
if k not in entry:
raise ValueError(f"Missing field '{k}' in log entry")
if not isinstance(entry[k], typ):
# allow np types for numeric
if typ in (float, int) and isinstance(entry[k], (np.floating, np.integer)):
continue
raise TypeError(f"Field '{k}' must be of type {typ.__name__}, got {type(entry[k]).__name__}")
return cls(**{k: entry[k] for k in required})
@dataclass
class PolicyResults:
p99_coverage: float
remaining_missing_cases: int
conversion_rates: float
max_wait_time: float
def to_json(self) -> Dict[str, Any]:
return asdict(self)
def evaluate_policies(log_data: List[Dict[str, Any]]) -> PolicyResults:
if not isinstance(log_data, list):
raise TypeError("log_data must be a list of dictionaries")
validated_entries = [LogEntry.validate(entry) for entry in log_data]
if not validated_entries:
return PolicyResults(0.0, 0, 0.0, 0.0)
df = pd.DataFrame([asdict(e) for e in validated_entries])
df['wait_time'] = df['t_index_visible'] - df['t_publish']
try:
p99 = np.percentile(df['wait_time'], 99)
except IndexError:
p99 = 0.0
missing_cases = int((df['drift_signature'] != 'OK').sum())
# Conversion: approximate via success ratio (no timeouts)
conversions = len(df[df['timeouts'] == 0]) / max(len(df), 1)
max_wait = float(df['wait_time'].max()) if not df.empty else 0.0
result = PolicyResults(
p99_coverage=float(p99),
remaining_missing_cases=missing_cases,
conversion_rates=float(conversions),
max_wait_time=max_wait,
)
logger.info("Policy evaluation complete: %s", result)
return result
def _load_jsonl(path: Path) -> List[Dict[str, Any]]:
with path.open('r', encoding='utf-8') as f:
return [json.loads(line.strip()) for line in f if line.strip()]
def main():
parser = argparse.ArgumentParser(description="Evaluate storage policies from log data.")
parser.add_argument('--input', required=True, help='Pfad zur JSONL-Logdatei mit Messdaten')
parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei mit Evaluierungsergebnissen')
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
log_data = _load_jsonl(input_path)
result = evaluate_policies(log_data)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as f:
json.dump(result.to_json(), f, indent=2)
logger.info("Results written to %s", output_path)
if __name__ == '__main__':
main()