Add trace_agg/src/trace_agg/core.py

This commit is contained in:
Mika 2025-12-21 11:58:05 +00:00
commit ffcba03735

View file

@ -0,0 +1,53 @@
import statistics
from typing import List, Dict, Any
def aggregate_traces(trace_data: List[Dict[str, Any]]) -> List[Dict[str, float]]:
"""Aggregiert Trace-Daten und berechnet Delta-Werte zwischen Ereignissen.
Args:
trace_data: Liste von TraceData-Objekten oder ähnlichen Dicts.
Returns:
Liste von Dictionaries mit berechneten Delta-Ketten.
"""
if not isinstance(trace_data, list):
raise ValueError("trace_data muss eine Liste sein.")
deltas = []
for entry in trace_data:
if not isinstance(entry, dict):
continue
entry_time = entry.get("entry_time")
first_read_time = entry.get("first_read_time")
baseline_time = entry.get("baseline_recalc_time")
# Validierung alle notwendigen Felder müssen vorhanden und numerisch sein
if not all(isinstance(v, (int, float)) for v in [entry_time, first_read_time, baseline_time] if v is not None):
continue
if None in (entry_time, first_read_time, baseline_time):
continue
delta_1 = first_read_time - entry_time
delta_2 = baseline_time - first_read_time
total_delta = baseline_time - entry_time
deltas.append({
"entry_time": entry_time,
"first_read_time": first_read_time,
"baseline_recalc_time": baseline_time,
"delta_entry_to_first_read": delta_1,
"delta_first_to_baseline": delta_2,
"total_delta": total_delta
})
# Optional: Durchschnittliche Offsets als Übersicht am Ende anfügen
if deltas:
summary = {
"avg_delta_entry_to_first_read": statistics.mean(d["delta_entry_to_first_read"] for d in deltas),
"avg_delta_first_to_baseline": statistics.mean(d["delta_first_to_baseline"] for d in deltas),
"avg_total_delta": statistics.mean(d["total_delta"] for d in deltas)
}
deltas.append({"summary": summary})
return deltas