From b8161a6ba55ce524c1cd7275cc16a683821e1885 Mon Sep 17 00:00:00 2001 From: Mika Date: Thu, 15 Jan 2026 10:47:37 +0000 Subject: [PATCH] Add ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/core.py --- .../src/ebpf_trace_instrumentation/core.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/core.py diff --git a/ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/core.py b/ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/core.py new file mode 100644 index 0000000..eca8a70 --- /dev/null +++ b/ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/core.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from typing import Dict, List, Any + + +__all__ = ["PublishEvent", "add_event"] + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class EventValidationError(Exception): + """Wird ausgelöst, wenn ein Event nicht alle erforderlichen Felder enthält oder Typen fehlerhaft sind.""" + pass + + +@dataclass +class PublishEvent: + """Repräsentiert ein eBPF Publish Event mit Feldern laut Spezifikation.""" + + ktime_ns: int + CPU: int + field_tag: str + new_value_hash: int + corr_id: str + + def __post_init__(self) -> None: + # Typvalidierung für CI und Datenintegrität + if not isinstance(self.ktime_ns, int): + raise EventValidationError("Expected ktime_ns to be int") + if not isinstance(self.CPU, int): + raise EventValidationError("Expected CPU to be int") + if not isinstance(self.field_tag, str): + raise EventValidationError("Expected field_tag to be str") + if not isinstance(self.new_value_hash, int): + raise EventValidationError("Expected new_value_hash to be int") + if not isinstance(self.corr_id, str): + raise EventValidationError("Expected corr_id to be str") + + +# Globale Event-Sammlung +_event_collection: List[PublishEvent] = [] + + +def add_event(event_type: str, event_data: Dict[str, Any]) -> None: + """Fügt ein neues PublishEvent zur globalen Event-Liste hinzu. + + Args: + event_type (str): Typbezeichner des Events. + event_data (dict): Rohdaten aus eBPF Instrumentation. + + Raises: + EventValidationError: Wenn Validierungen fehlschlagen. + """ + required_fields = {"ktime_ns", "CPU", "field_tag", "new_value_hash", "corr_id"} + + logger.debug("Attempting to add event of type %s with data %s", event_type, event_data) + + # Validierung der Eingabedaten + if not isinstance(event_data, dict): + raise EventValidationError("event_data must be a dictionary") + + missing = required_fields - set(event_data.keys()) + if missing: + raise EventValidationError(f"Missing required fields: {', '.join(missing)}") + + try: + event = PublishEvent( + ktime_ns=int(event_data["ktime_ns"]), + CPU=int(event_data["CPU"]), + field_tag=str(event_data["field_tag"]), + new_value_hash=int(event_data["new_value_hash"]), + corr_id=str(event_data["corr_id"]), + ) + except (ValueError, TypeError) as e: + raise EventValidationError(f"Invalid field types in event_data: {e}") from e + + _event_collection.append(event) + logger.info("Added PublishEvent (type=%s, corr_id=%s) to collection", event_type, event.corr_id) + + +def _serialize_events() -> str: + """Hilfsfunktion für mögliche spätere Speicherung als JSON.""" + return json.dumps([e.__dict__ for e in _event_collection], indent=2)