Add artifact.timing_analysis/src/artifact_timing_analysis/cli.py
This commit is contained in:
parent
934712d4d8
commit
6121dba114
1 changed files with 73 additions and 0 deletions
73
artifact.timing_analysis/src/artifact_timing_analysis/cli.py
Normal file
73
artifact.timing_analysis/src/artifact_timing_analysis/cli.py
Normal file
|
|
@ -0,0 +1,73 @@
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import List, Any, Dict
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from artifact_timing_analysis import core
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_timing_record(record: Dict[str, Any]) -> bool:
|
||||||
|
required_fields = {"timestamp": str, "t_gate_read": (int, float), "t_index_visible": (int, float), "offset": (int, float)}
|
||||||
|
for key, expected_type in required_fields.items():
|
||||||
|
if key not in record:
|
||||||
|
raise ValueError(f"Missing required field '{key}' in record: {record}")
|
||||||
|
if not isinstance(record[key], expected_type):
|
||||||
|
raise TypeError(f"Field '{key}' must be {expected_type}, got {type(record[key])}.")
|
||||||
|
try:
|
||||||
|
datetime.fromisoformat(record["timestamp"])
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f"Invalid timestamp format for record: {record['timestamp']} ({e})")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _load_timing_data(input_path: Path) -> List[Dict[str, Any]]:
|
||||||
|
if not input_path.exists():
|
||||||
|
raise FileNotFoundError(f"Input file not found: {input_path}")
|
||||||
|
with open(input_path, "r", encoding="utf-8") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
if not isinstance(data, list):
|
||||||
|
raise ValueError("Expected a list of timing data records.")
|
||||||
|
for record in data:
|
||||||
|
_validate_timing_record(record)
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def _save_report(output_path: Path, report: Dict[str, Any]) -> None:
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(output_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: List[str] | None = None) -> int:
|
||||||
|
parser = argparse.ArgumentParser(description="Gate V1 Timing Analysis Tool CLI.")
|
||||||
|
parser.add_argument("--input", required=True, help="Pfad zur JSON-Eingabedatei mit Timingdaten")
|
||||||
|
parser.add_argument("--output", required=True, help="Pfad zur Ausgabe-Datei mit dem Anomaliebericht")
|
||||||
|
|
||||||
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
|
input_path = Path(args.input)
|
||||||
|
output_path = Path(args.output)
|
||||||
|
|
||||||
|
try:
|
||||||
|
timing_records = _load_timing_data(input_path)
|
||||||
|
stats_report = core.analyze_timing_offsets(timing_records)
|
||||||
|
readable_report = core.report_timing_anomalies()
|
||||||
|
# Combine results for final JSON output
|
||||||
|
full_output = {
|
||||||
|
"stats": stats_report,
|
||||||
|
"readable_report": readable_report,
|
||||||
|
"generated_at": datetime.utcnow().isoformat() + "Z",
|
||||||
|
}
|
||||||
|
_save_report(output_path, full_output)
|
||||||
|
print(f"Anomaly report successfully generated: {output_path}")
|
||||||
|
return 0
|
||||||
|
except Exception as exc:
|
||||||
|
print(f"[ERROR] {exc}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Loading…
Reference in a new issue