Add hotspot_logging/src/hotspot_logging/cli.py

This commit is contained in:
Mika 2026-03-25 11:11:47 +00:00
parent f636177eae
commit e68e24731e

View file

@ -0,0 +1,52 @@
import argparse
import json
from pathlib import Path
from typing import Any
from hotspot_logging.core import log_hotspot_ratio
def _validate_input_data(data: Any) -> bool:
if not isinstance(data, list):
raise ValueError("Input JSON must be a list of objects.")
for entry in data:
if not isinstance(entry, dict):
raise ValueError("Each entry must be a dictionary.")
for field in ("time_window", "near_expiry_jobs", "total_jobs"):
if field not in entry:
raise ValueError(f"Missing required field '{field}' in entry.")
return True
def main() -> None:
parser = argparse.ArgumentParser(description="Log hotspot ratios per time window.")
parser.add_argument("--input", required=True, help="Path to the input JSON file with job metrics.")
parser.add_argument("--output", required=True, help="Path to the output JSON log file.")
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
with input_path.open("r", encoding="utf-8") as f:
data = json.load(f)
_validate_input_data(data)
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Redirect log output to the output file via a context
import sys
from contextlib import redirect_stdout
with output_path.open("a", encoding="utf-8") as outfile, redirect_stdout(outfile):
for entry in data:
time_window = float(entry["time_window"])
near_expiry_jobs = int(entry["near_expiry_jobs"])
total_jobs = int(entry["total_jobs"])
assert total_jobs >= 0, "Total jobs must be non-negative."
assert near_expiry_jobs >= 0, "Near-expiry jobs must be non-negative."
log_hotspot_ratio(time_window, near_expiry_jobs, total_jobs)
if __name__ == "__main__":
main()