Add artifact_unit_tests/src/trace_agg.py

This commit is contained in:
Mika 2025-12-08 15:52:07 +00:00
parent b69af806c0
commit 202d71ca1f

View file

@ -0,0 +1,27 @@
from typing import List, Dict
def aggregate_traces(events: List[dict]) -> Dict[str, int]:
"""Aggregiert Trace-Events nach Typ. Filtert nur 'clocksource'-Events
und bildet die Gesamtsumme ihrer Werte pro Event-Typ.
Args:
events: Liste von Event-Dictionaries mit Schlüsseln
'timestamp', 'event_type', 'value'.
Returns:
dict[str, int]: Mapping von Event-Typ zu Summenwert.
"""
result: Dict[str, int] = {}
for ev in events:
# Nur clocksource-ähnliche Events berücksichtigen
if not isinstance(ev, dict):
continue
evt_type = ev.get('event_type')
if evt_type != 'clocksource':
continue
try:
value = int(ev.get('value', 0))
except (TypeError, ValueError):
value = 0
result[evt_type] = result.get(evt_type, 0) + value
return result