From 893d98bbc435caf9399b95cfb42eebcc3a6e2735 Mon Sep 17 00:00:00 2001 From: Mika Date: Wed, 14 Jan 2026 15:28:04 +0000 Subject: [PATCH] Add trace_agg/src/trace_agg/core.py --- trace_agg/src/trace_agg/core.py | 108 ++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 trace_agg/src/trace_agg/core.py diff --git a/trace_agg/src/trace_agg/core.py b/trace_agg/src/trace_agg/core.py new file mode 100644 index 0000000..d21f6f5 --- /dev/null +++ b/trace_agg/src/trace_agg/core.py @@ -0,0 +1,108 @@ +import json +import logging +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Dict, List + + +# Configure module-level logger +logger = logging.getLogger(__name__) +if not logger.handlers: + logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + + +@dataclass +class TraceData: + """Data model representing aggregated trace event information.""" + + had_sched_switch: int + had_irq: int + had_softirq: int + cpu_ids_seen: List[int] + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class TraceDataValidationError(Exception): + """Raised when the input trace data validation fails.""" + + + +def _validate_trace_entry(entry: Dict[str, Any]) -> None: + required_fields = ["had_sched_switch", "had_irq", "had_softirq", "cpu_ids_seen"] + for field in required_fields: + if field not in entry: + raise TraceDataValidationError(f"Missing required field: {field}") + if not isinstance(entry["cpu_ids_seen"], list): + raise TraceDataValidationError("Field 'cpu_ids_seen' must be a list") + for f in ["had_sched_switch", "had_irq", "had_softirq"]: + if not isinstance(entry[f], int): + raise TraceDataValidationError(f"Field '{f}' must be of type int") + + + +def _classify_trace_data(trace: TraceData) -> str: + """Classify trace data based on event patterns.""" + if trace.had_sched_switch == 0 and trace.had_irq == 0 and trace.had_softirq == 0: + return "no_activity" + if trace.had_sched_switch == 0 and (trace.had_irq or trace.had_softirq): + return "possible_hidden_switch" + if trace.had_sched_switch == 1 and (trace.had_irq + trace.had_softirq) > 1: + return "publish_race" + return "normal" + + + +def aggregate_trace_data(correlation_id: str) -> Dict[str, Any]: + """Aggregates eBPF trace data for a given correlation_id. + + Args: + correlation_id: Identifier used to locate trace event file. + + Returns: + A JSON-compatible dictionary summarizing the trace data. + """ + assert isinstance(correlation_id, str) and correlation_id.strip(), "correlation_id must be a non-empty string" + + # Locate input file + input_file = Path(f"data/raw_trace_{correlation_id}.json") + if not input_file.exists(): + raise FileNotFoundError(f"Trace file not found: {input_file}") + + logger.info(f"Aggregating trace data from {input_file} for correlation_id={correlation_id}") + + # Load and validate input JSON + try: + with input_file.open('r', encoding='utf-8') as f: + raw_data = json.load(f) + except json.JSONDecodeError as e: + raise TraceDataValidationError(f"Invalid JSON format in {input_file}: {e}") from e + + if isinstance(raw_data, dict): + entries = [raw_data] + elif isinstance(raw_data, list): + entries = raw_data + else: + raise TraceDataValidationError(f"Unexpected JSON structure in {input_file}") + + aggregate = TraceData(had_sched_switch=0, had_irq=0, had_softirq=0, cpu_ids_seen=[]) + + for entry in entries: + _validate_trace_entry(entry) + aggregate.had_sched_switch |= int(entry["had_sched_switch"]) + aggregate.had_irq |= int(entry["had_irq"]) + aggregate.had_softirq |= int(entry["had_softirq"]) + aggregate.cpu_ids_seen.extend(int(cpu) for cpu in entry["cpu_ids_seen"] if isinstance(cpu, int)) + + aggregate.cpu_ids_seen = sorted(set(aggregate.cpu_ids_seen)) + + classification = _classify_trace_data(aggregate) + + result = aggregate.to_dict() + result["correlation_id"] = correlation_id + result["classification"] = classification + + logger.info(f"Aggregation complete for correlation_id={correlation_id}: {classification}") + + return result \ No newline at end of file