Add retry_mechanism_analysis/src/retry_mechanism_analysis/cli.py

This commit is contained in:
Mika 2026-03-12 11:51:46 +00:00
parent 6408ac9b7f
commit 9154bbd593

View file

@ -0,0 +1,73 @@
import argparse
import json
import sys
from pathlib import Path
from typing import Any, List, Dict
from retry_mechanism_analysis.core import compare_retry_overhead
def _validate_input_file(file_path: Path) -> None:
if not file_path.exists():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {file_path}")
if not file_path.is_file():
raise ValueError(f"Ungültiger Eingabepfad (keine Datei): {file_path}")
def _load_json(file_path: Path) -> List[Dict[str, Any]]:
with file_path.open("r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Fehler beim Parsen der JSON-Datei: {file_path}: {e}") from e
if not isinstance(data, list):
raise TypeError("Eingabedaten müssen eine Liste von Logeinträgen sein.")
return data
def _write_json(file_path: Path, content: Any) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open("w", encoding="utf-8") as f:
json.dump(content, f, indent=2, ensure_ascii=False)
def main() -> None:
parser = argparse.ArgumentParser(
description="Analyse der Retry-Overheads über verschiedene Parallelitätslevels."
)
parser.add_argument(
"--input",
required=True,
help="Pfad zur Eingabedatei mit Logdaten (JSON)."
)
parser.add_argument(
"--output",
required=True,
help="Pfad zur Ausgabedatei, in die die Analyseergebnisse geschrieben werden."
)
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
try:
_validate_input_file(input_path)
log_data = _load_json(input_path)
result = compare_retry_overhead(log_data)
# Validierungs-Check gemäß ci_ready
assert isinstance(result, dict) and all(
key in result for key in ("parallelism_level", "p50", "p95", "p99")
), "Ungültige Struktur der Analyseergebnisse."
_write_json(output_path, result)
print(f"Analyse abgeschlossen. Ergebnis wurde gespeichert unter: {output_path}")
except Exception as exc: # generischer Catch für CLI
print(f"Fehler: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()