From e68e24731e11ca2d9e6f7c680f63f6e04fe0ff04 Mon Sep 17 00:00:00 2001 From: Mika Date: Wed, 25 Mar 2026 11:11:47 +0000 Subject: [PATCH] Add hotspot_logging/src/hotspot_logging/cli.py --- hotspot_logging/src/hotspot_logging/cli.py | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 hotspot_logging/src/hotspot_logging/cli.py diff --git a/hotspot_logging/src/hotspot_logging/cli.py b/hotspot_logging/src/hotspot_logging/cli.py new file mode 100644 index 0000000..5d0e638 --- /dev/null +++ b/hotspot_logging/src/hotspot_logging/cli.py @@ -0,0 +1,52 @@ +import argparse +import json +from pathlib import Path +from typing import Any +from hotspot_logging.core import log_hotspot_ratio + +def _validate_input_data(data: Any) -> bool: + if not isinstance(data, list): + raise ValueError("Input JSON must be a list of objects.") + for entry in data: + if not isinstance(entry, dict): + raise ValueError("Each entry must be a dictionary.") + for field in ("time_window", "near_expiry_jobs", "total_jobs"): + if field not in entry: + raise ValueError(f"Missing required field '{field}' in entry.") + return True + +def main() -> None: + parser = argparse.ArgumentParser(description="Log hotspot ratios per time window.") + parser.add_argument("--input", required=True, help="Path to the input JSON file with job metrics.") + parser.add_argument("--output", required=True, help="Path to the output JSON log file.") + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + if not input_path.exists(): + raise FileNotFoundError(f"Input file not found: {input_path}") + + with input_path.open("r", encoding="utf-8") as f: + data = json.load(f) + + _validate_input_data(data) + + # Ensure output directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Redirect log output to the output file via a context + import sys + from contextlib import redirect_stdout + + with output_path.open("a", encoding="utf-8") as outfile, redirect_stdout(outfile): + for entry in data: + time_window = float(entry["time_window"]) + near_expiry_jobs = int(entry["near_expiry_jobs"]) + total_jobs = int(entry["total_jobs"]) + assert total_jobs >= 0, "Total jobs must be non-negative." + assert near_expiry_jobs >= 0, "Near-expiry jobs must be non-negative." + log_hotspot_ratio(time_window, near_expiry_jobs, total_jobs) + +if __name__ == "__main__": + main()