Add trace_agg/src/trace_agg/core.py

This commit is contained in:
Mika 2026-01-14 15:28:04 +00:00
commit 893d98bbc4

View file

@ -0,0 +1,108 @@
import json
import logging
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List
# Configure module-level logger
logger = logging.getLogger(__name__)
if not logger.handlers:
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
@dataclass
class TraceData:
"""Data model representing aggregated trace event information."""
had_sched_switch: int
had_irq: int
had_softirq: int
cpu_ids_seen: List[int]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class TraceDataValidationError(Exception):
"""Raised when the input trace data validation fails."""
def _validate_trace_entry(entry: Dict[str, Any]) -> None:
required_fields = ["had_sched_switch", "had_irq", "had_softirq", "cpu_ids_seen"]
for field in required_fields:
if field not in entry:
raise TraceDataValidationError(f"Missing required field: {field}")
if not isinstance(entry["cpu_ids_seen"], list):
raise TraceDataValidationError("Field 'cpu_ids_seen' must be a list")
for f in ["had_sched_switch", "had_irq", "had_softirq"]:
if not isinstance(entry[f], int):
raise TraceDataValidationError(f"Field '{f}' must be of type int")
def _classify_trace_data(trace: TraceData) -> str:
"""Classify trace data based on event patterns."""
if trace.had_sched_switch == 0 and trace.had_irq == 0 and trace.had_softirq == 0:
return "no_activity"
if trace.had_sched_switch == 0 and (trace.had_irq or trace.had_softirq):
return "possible_hidden_switch"
if trace.had_sched_switch == 1 and (trace.had_irq + trace.had_softirq) > 1:
return "publish_race"
return "normal"
def aggregate_trace_data(correlation_id: str) -> Dict[str, Any]:
"""Aggregates eBPF trace data for a given correlation_id.
Args:
correlation_id: Identifier used to locate trace event file.
Returns:
A JSON-compatible dictionary summarizing the trace data.
"""
assert isinstance(correlation_id, str) and correlation_id.strip(), "correlation_id must be a non-empty string"
# Locate input file
input_file = Path(f"data/raw_trace_{correlation_id}.json")
if not input_file.exists():
raise FileNotFoundError(f"Trace file not found: {input_file}")
logger.info(f"Aggregating trace data from {input_file} for correlation_id={correlation_id}")
# Load and validate input JSON
try:
with input_file.open('r', encoding='utf-8') as f:
raw_data = json.load(f)
except json.JSONDecodeError as e:
raise TraceDataValidationError(f"Invalid JSON format in {input_file}: {e}") from e
if isinstance(raw_data, dict):
entries = [raw_data]
elif isinstance(raw_data, list):
entries = raw_data
else:
raise TraceDataValidationError(f"Unexpected JSON structure in {input_file}")
aggregate = TraceData(had_sched_switch=0, had_irq=0, had_softirq=0, cpu_ids_seen=[])
for entry in entries:
_validate_trace_entry(entry)
aggregate.had_sched_switch |= int(entry["had_sched_switch"])
aggregate.had_irq |= int(entry["had_irq"])
aggregate.had_softirq |= int(entry["had_softirq"])
aggregate.cpu_ids_seen.extend(int(cpu) for cpu in entry["cpu_ids_seen"] if isinstance(cpu, int))
aggregate.cpu_ids_seen = sorted(set(aggregate.cpu_ids_seen))
classification = _classify_trace_data(aggregate)
result = aggregate.to_dict()
result["correlation_id"] = correlation_id
result["classification"] = classification
logger.info(f"Aggregation complete for correlation_id={correlation_id}: {classification}")
return result