Add trace_analysis/src/trace_analysis/core.py

This commit is contained in:
Mika 2026-01-10 14:23:02 +00:00
parent 10556bd97e
commit 6c4d7d2436

View file

@ -0,0 +1,57 @@
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List
def analyze_spikes(event_log: str) -> Dict[str, Any]:
"""Analysiert eBPF-Eventdaten und ermittelt Häufigkeit und Muster von Performance-Spitzen.
Es wird erwartet, dass die JSON-Datei eine Liste von Eventobjekten enthält, z. B.:
[{"timestamp": "2024-01-01T12:00:00Z", "migration_type": "fully_pinned", "latency": 3.2}, ...]
Ein Spike wird angenommen, wenn das Feld 'latency' einen Schwellenwert überschreitet.
Rückgabe: Dictionary mit Feldern migration_type, timestamp, spike_count.
"""
path = Path(event_log)
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"Event log not found: {event_log}")
try:
with path.open("r", encoding="utf-8") as f:
events = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in event log: {e}")
if not isinstance(events, list):
raise ValueError("Event log must contain a JSON array of event objects.")
spike_threshold = 2.0 # einfache Heuristik in Millisekunden
spike_summary: Dict[str, int] = {}
for ev in events:
if not isinstance(ev, dict):
continue
mig_type = ev.get("migration_type")
latency = ev.get("latency")
if mig_type is None or not isinstance(mig_type, str):
continue
if isinstance(latency, (int, float)) and latency > spike_threshold:
spike_summary[mig_type] = spike_summary.get(mig_type, 0) + 1
if not spike_summary:
# Keine Spikes gefunden Rückgabe mit leeren Zählern
return {
"migration_type": None,
"timestamp": datetime.utcnow().isoformat() + "Z",
"spike_count": 0,
}
# Nimmt den meistbetroffenen Migrationstyp
top_type = max(spike_summary, key=spike_summary.get)
report = {
"migration_type": top_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
"spike_count": spike_summary[top_type],
}
return report