Add ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/core.py

This commit is contained in:
Mika 2026-01-15 10:47:37 +00:00
commit b8161a6ba5

View file

@ -0,0 +1,88 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Any
__all__ = ["PublishEvent", "add_event"]
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class EventValidationError(Exception):
"""Wird ausgelöst, wenn ein Event nicht alle erforderlichen Felder enthält oder Typen fehlerhaft sind."""
pass
@dataclass
class PublishEvent:
"""Repräsentiert ein eBPF Publish Event mit Feldern laut Spezifikation."""
ktime_ns: int
CPU: int
field_tag: str
new_value_hash: int
corr_id: str
def __post_init__(self) -> None:
# Typvalidierung für CI und Datenintegrität
if not isinstance(self.ktime_ns, int):
raise EventValidationError("Expected ktime_ns to be int")
if not isinstance(self.CPU, int):
raise EventValidationError("Expected CPU to be int")
if not isinstance(self.field_tag, str):
raise EventValidationError("Expected field_tag to be str")
if not isinstance(self.new_value_hash, int):
raise EventValidationError("Expected new_value_hash to be int")
if not isinstance(self.corr_id, str):
raise EventValidationError("Expected corr_id to be str")
# Globale Event-Sammlung
_event_collection: List[PublishEvent] = []
def add_event(event_type: str, event_data: Dict[str, Any]) -> None:
"""Fügt ein neues PublishEvent zur globalen Event-Liste hinzu.
Args:
event_type (str): Typbezeichner des Events.
event_data (dict): Rohdaten aus eBPF Instrumentation.
Raises:
EventValidationError: Wenn Validierungen fehlschlagen.
"""
required_fields = {"ktime_ns", "CPU", "field_tag", "new_value_hash", "corr_id"}
logger.debug("Attempting to add event of type %s with data %s", event_type, event_data)
# Validierung der Eingabedaten
if not isinstance(event_data, dict):
raise EventValidationError("event_data must be a dictionary")
missing = required_fields - set(event_data.keys())
if missing:
raise EventValidationError(f"Missing required fields: {', '.join(missing)}")
try:
event = PublishEvent(
ktime_ns=int(event_data["ktime_ns"]),
CPU=int(event_data["CPU"]),
field_tag=str(event_data["field_tag"]),
new_value_hash=int(event_data["new_value_hash"]),
corr_id=str(event_data["corr_id"]),
)
except (ValueError, TypeError) as e:
raise EventValidationError(f"Invalid field types in event_data: {e}") from e
_event_collection.append(event)
logger.info("Added PublishEvent (type=%s, corr_id=%s) to collection", event_type, event.corr_id)
def _serialize_events() -> str:
"""Hilfsfunktion für mögliche spätere Speicherung als JSON."""
return json.dumps([e.__dict__ for e in _event_collection], indent=2)