Add data_collection_tool/src/data_collection_tool/core.py

This commit is contained in:
Mika 2026-03-14 17:17:10 +00:00
commit 57167a4867

View file

@ -0,0 +1,74 @@
import argparse
import json
import os
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime
import random
@dataclass
class PerformanceData:
"""Datenmodell zur Darstellung gesammelter Performance-Metriken."""
run_id: str
p50: float
p95: float
max_alerts: int
total_overhead: float
def collect_performance_data(run_id: str) -> dict:
"""Sammelt Performance-Metriken für einen gegebenen Run und speichert sie als JSON-Datei.
Args:
run_id: Eindeutige Kennung des Replikationsruns.
Returns:
dict: Dictionary mit zusammengefassten Performance-Metriken.
"""
# Input validation
assert isinstance(run_id, str) and run_id.strip(), "run_id must be a non-empty string"
# Simuliere Messdaten (in realer Anwendung: echte Messung)
p50 = round(random.uniform(0.5, 2.0), 4)
p95 = round(p50 + random.uniform(0.1, 0.8), 4)
max_alerts = random.randint(0, 10)
total_overhead = round(random.uniform(0.0, 1.5), 4)
data = PerformanceData(
run_id=run_id,
p50=p50,
p95=p95,
max_alerts=max_alerts,
total_overhead=total_overhead,
)
# Ausgabe-Verzeichnis vorbereiten
output_dir = Path("output")
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{run_id}_performance.json"
with output_path.open("w", encoding="utf-8") as f:
json.dump(asdict(data), f, indent=2, ensure_ascii=False)
# Sicherheitsprüfung (CI-ready)
assert output_path.exists(), f"Output file was not created: {output_path}"
return asdict(data)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Collect performance metrics for a given run.")
parser.add_argument("--run-id", required=True, help="Die Run-ID, für die die Performance-Daten erfasst werden sollen.")
return parser.parse_args()
def _main():
args = _parse_args()
data = collect_performance_data(args.run_id)
print(json.dumps(data, indent=2, ensure_ascii=False))
if __name__ == "__main__":
_main()