Add max_alert_logging/src/max_alert_logging/core.py

This commit is contained in:
Mika 2026-03-13 16:22:58 +00:00
parent a3987811ee
commit f59c9cb08f

View file

@ -0,0 +1,104 @@
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@dataclass(frozen=True)
class LogEntry:
corr_id: str
stratum: str
job_parallelism: int
expires_at_dist_hours: float
t_gate_read: float
t_index_visible: float
retry_taken: int
retry_total_overhead_ms: float
policy_hash: str
setup_fingerprint: str
_DEDUP_CACHE: Dict[Tuple[str, str], datetime] = {}
def _validate_input_type(name: str, value, expected_type) -> None:
if not isinstance(value, expected_type):
raise TypeError(f"Parameter '{name}' muss Typ {expected_type.__name__} haben, erhielt {type(value).__name__}.")
def log_alert(
corr_id: str,
stratum: str,
job_parallelism: int,
expires_at_dist_hours: float,
t_gate_read: float,
t_index_visible: float,
retry_taken: int,
retry_total_overhead_ms: float,
policy_hash: str,
setup_fingerprint: str,
) -> bool:
"""Erzeugt einen deduplizierten Logeintrag für einen Max-only Alert.
Jeder Alert wird pro (corr_id, stratum) nur einmal erzeugt.
Rückgabe: True bei erfolgreichem Log, False, wenn Duplikat."""
try:
# Eingabevalidierung (CI ready + input_validation_required)
_validate_input_type("corr_id", corr_id, str)
_validate_input_type("stratum", stratum, str)
_validate_input_type("job_parallelism", job_parallelism, int)
_validate_input_type("expires_at_dist_hours", expires_at_dist_hours, float)
_validate_input_type("t_gate_read", t_gate_read, float)
_validate_input_type("t_index_visible", t_index_visible, float)
_validate_input_type("retry_taken", retry_taken, int)
_validate_input_type("retry_total_overhead_ms", retry_total_overhead_ms, float)
_validate_input_type("policy_hash", policy_hash, str)
_validate_input_type("setup_fingerprint", setup_fingerprint, str)
key = (corr_id, stratum)
if key in _DEDUP_CACHE:
logger.debug(f"Duplikat erkannt für {key}; kein neuer Alert wird geschrieben.")
return False
entry = LogEntry(
corr_id=corr_id,
stratum=stratum,
job_parallelism=job_parallelism,
expires_at_dist_hours=expires_at_dist_hours,
t_gate_read=t_gate_read,
t_index_visible=t_index_visible,
retry_taken=retry_taken,
retry_total_overhead_ms=retry_total_overhead_ms,
policy_hash=policy_hash,
setup_fingerprint=setup_fingerprint,
)
output_path = Path("output/max_alerts.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
if output_path.exists():
with output_path.open("r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
data = []
data.append(entry.__dict__)
with output_path.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
_DEDUP_CACHE[key] = datetime.utcnow()
logger.info(f"Neuer Max-only Alert geschrieben: {corr_id}, {stratum}.")
return True
except Exception as e:
logger.exception(f"Fehler beim Loggen des Alerts: {e}")
return False