Add max_alert_logging/src/max_alert_logging/cli.py

This commit is contained in:
Mika 2026-03-13 16:22:58 +00:00
parent f59c9cb08f
commit 15fbd84b32

View file

@ -0,0 +1,86 @@
import argparse
import json
import logging
from pathlib import Path
from typing import Any
from max_alert_logging import core
def main() -> None:
"""CLI entry point for deduplicating Max-only alerts from log files."""
parser = argparse.ArgumentParser(description="Run Max-only Alert deduplication and logging.")
parser.add_argument("--input", required=True, help="Path to the input JSON file with log entries.")
parser.add_argument("--output", required=True, help="Path to the output JSON file for deduplicated alerts.")
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
if not input_path.exists() or not input_path.is_file():
logging.error(f"Input file not found: {input_path}")
raise FileNotFoundError(f"Input file not found: {input_path}")
with input_path.open("r", encoding="utf-8") as f:
try:
data: Any = json.load(f)
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON format: {e}")
raise
if not isinstance(data, list):
logging.error("Input JSON must be a list of log entries.")
raise ValueError("Input JSON must be a list of log entries.")
deduped_entries = []
seen_keys = set()
for entry in data:
if not isinstance(entry, dict):
logging.warning("Skipping invalid entry (not a dict).")
continue
required_fields = [
"corr_id", "stratum", "job_parallelism", "expires_at_dist_hours",
"t_gate_read", "t_index_visible", "retry_taken", "retry_total_overhead_ms",
"policy_hash", "setup_fingerprint"
]
if not all(field in entry for field in required_fields):
logging.warning(f"Skipping incomplete entry: {entry}")
continue
key = (entry["corr_id"], entry["policy_hash"], entry["setup_fingerprint"])
if key in seen_keys:
continue
seen_keys.add(key)
success = core.log_alert(
corr_id=entry["corr_id"],
stratum=entry["stratum"],
job_parallelism=int(entry["job_parallelism"]),
expires_at_dist_hours=float(entry["expires_at_dist_hours"]),
t_gate_read=float(entry["t_gate_read"]),
t_index_visible=float(entry["t_index_visible"]),
retry_taken=int(entry["retry_taken"]),
retry_total_overhead_ms=float(entry["retry_total_overhead_ms"]),
policy_hash=str(entry["policy_hash"]),
setup_fingerprint=str(entry["setup_fingerprint"]),
)
if success:
deduped_entries.append(entry)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(deduped_entries, f, ensure_ascii=False, indent=2)
logging.info(f"Deduplicated {len(deduped_entries)} alerts written to {output_path}.")
if __name__ == "__main__":
main()