diff --git a/max_alert_logging/src/max_alert_logging/cli.py b/max_alert_logging/src/max_alert_logging/cli.py new file mode 100644 index 0000000..25367f7 --- /dev/null +++ b/max_alert_logging/src/max_alert_logging/cli.py @@ -0,0 +1,86 @@ +import argparse +import json +import logging +from pathlib import Path +from typing import Any +from max_alert_logging import core + + +def main() -> None: + """CLI entry point for deduplicating Max-only alerts from log files.""" + parser = argparse.ArgumentParser(description="Run Max-only Alert deduplication and logging.") + parser.add_argument("--input", required=True, help="Path to the input JSON file with log entries.") + parser.add_argument("--output", required=True, help="Path to the output JSON file for deduplicated alerts.") + + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + if not input_path.exists() or not input_path.is_file(): + logging.error(f"Input file not found: {input_path}") + raise FileNotFoundError(f"Input file not found: {input_path}") + + with input_path.open("r", encoding="utf-8") as f: + try: + data: Any = json.load(f) + except json.JSONDecodeError as e: + logging.error(f"Invalid JSON format: {e}") + raise + + if not isinstance(data, list): + logging.error("Input JSON must be a list of log entries.") + raise ValueError("Input JSON must be a list of log entries.") + + deduped_entries = [] + seen_keys = set() + + for entry in data: + if not isinstance(entry, dict): + logging.warning("Skipping invalid entry (not a dict).") + continue + required_fields = [ + "corr_id", "stratum", "job_parallelism", "expires_at_dist_hours", + "t_gate_read", "t_index_visible", "retry_taken", "retry_total_overhead_ms", + "policy_hash", "setup_fingerprint" + ] + if not all(field in entry for field in required_fields): + logging.warning(f"Skipping incomplete entry: {entry}") + continue + + key = (entry["corr_id"], entry["policy_hash"], entry["setup_fingerprint"]) + if key in seen_keys: + continue + seen_keys.add(key) + + success = core.log_alert( + corr_id=entry["corr_id"], + stratum=entry["stratum"], + job_parallelism=int(entry["job_parallelism"]), + expires_at_dist_hours=float(entry["expires_at_dist_hours"]), + t_gate_read=float(entry["t_gate_read"]), + t_index_visible=float(entry["t_index_visible"]), + retry_taken=int(entry["retry_taken"]), + retry_total_overhead_ms=float(entry["retry_total_overhead_ms"]), + policy_hash=str(entry["policy_hash"]), + setup_fingerprint=str(entry["setup_fingerprint"]), + ) + + if success: + deduped_entries.append(entry) + + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as f: + json.dump(deduped_entries, f, ensure_ascii=False, indent=2) + + logging.info(f"Deduplicated {len(deduped_entries)} alerts written to {output_path}.") + + +if __name__ == "__main__": + main()