From f59c9cb08ffb9ec12d779d61d51f86d6800c7de1 Mon Sep 17 00:00:00 2001 From: Mika Date: Fri, 13 Mar 2026 16:22:58 +0000 Subject: [PATCH] Add max_alert_logging/src/max_alert_logging/core.py --- .../src/max_alert_logging/core.py | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 max_alert_logging/src/max_alert_logging/core.py diff --git a/max_alert_logging/src/max_alert_logging/core.py b/max_alert_logging/src/max_alert_logging/core.py new file mode 100644 index 0000000..9ab7f5d --- /dev/null +++ b/max_alert_logging/src/max_alert_logging/core.py @@ -0,0 +1,104 @@ +import json +import logging +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Dict, Tuple + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +@dataclass(frozen=True) +class LogEntry: + corr_id: str + stratum: str + job_parallelism: int + expires_at_dist_hours: float + t_gate_read: float + t_index_visible: float + retry_taken: int + retry_total_overhead_ms: float + policy_hash: str + setup_fingerprint: str + + +_DEDUP_CACHE: Dict[Tuple[str, str], datetime] = {} + + +def _validate_input_type(name: str, value, expected_type) -> None: + if not isinstance(value, expected_type): + raise TypeError(f"Parameter '{name}' muss Typ {expected_type.__name__} haben, erhielt {type(value).__name__}.") + + +def log_alert( + corr_id: str, + stratum: str, + job_parallelism: int, + expires_at_dist_hours: float, + t_gate_read: float, + t_index_visible: float, + retry_taken: int, + retry_total_overhead_ms: float, + policy_hash: str, + setup_fingerprint: str, +) -> bool: + """Erzeugt einen deduplizierten Logeintrag für einen Max-only Alert. + + Jeder Alert wird pro (corr_id, stratum) nur einmal erzeugt. + Rückgabe: True bei erfolgreichem Log, False, wenn Duplikat.""" + + try: + # Eingabevalidierung (CI ready + input_validation_required) + _validate_input_type("corr_id", corr_id, str) + _validate_input_type("stratum", stratum, str) + _validate_input_type("job_parallelism", job_parallelism, int) + _validate_input_type("expires_at_dist_hours", expires_at_dist_hours, float) + _validate_input_type("t_gate_read", t_gate_read, float) + _validate_input_type("t_index_visible", t_index_visible, float) + _validate_input_type("retry_taken", retry_taken, int) + _validate_input_type("retry_total_overhead_ms", retry_total_overhead_ms, float) + _validate_input_type("policy_hash", policy_hash, str) + _validate_input_type("setup_fingerprint", setup_fingerprint, str) + + key = (corr_id, stratum) + if key in _DEDUP_CACHE: + logger.debug(f"Duplikat erkannt für {key}; kein neuer Alert wird geschrieben.") + return False + + entry = LogEntry( + corr_id=corr_id, + stratum=stratum, + job_parallelism=job_parallelism, + expires_at_dist_hours=expires_at_dist_hours, + t_gate_read=t_gate_read, + t_index_visible=t_index_visible, + retry_taken=retry_taken, + retry_total_overhead_ms=retry_total_overhead_ms, + policy_hash=policy_hash, + setup_fingerprint=setup_fingerprint, + ) + + output_path = Path("output/max_alerts.json") + output_path.parent.mkdir(parents=True, exist_ok=True) + + if output_path.exists(): + with output_path.open("r", encoding="utf-8") as f: + try: + data = json.load(f) + except json.JSONDecodeError: + data = [] + else: + data = [] + + data.append(entry.__dict__) + with output_path.open("w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + _DEDUP_CACHE[key] = datetime.utcnow() + logger.info(f"Neuer Max-only Alert geschrieben: {corr_id}, {stratum}.") + return True + except Exception as e: + logger.exception(f"Fehler beim Loggen des Alerts: {e}") + return False