Add bpf_logging/src/bpf_logging/core.py

This commit is contained in:
Mika 2025-12-27 16:53:26 +00:00
parent 2ce9be97f4
commit d8507ca74b

View file

@ -0,0 +1,55 @@
import json
import os
import time
from pathlib import Path
from bcc import BPF
_LOG_DIR = Path("output")
_LOG_FILE = _LOG_DIR / "log_entries.json"
_bpf_instance = None
def setup_bpf_logging(correlation_id: str) -> None:
"""Initialisiert die BPF-Logging-Struktur für eine gegebene Correlation-ID."""
if not isinstance(correlation_id, str) or not correlation_id.strip():
raise ValueError("correlation_id muss ein nicht-leerer String sein.")
global _bpf_instance
_LOG_DIR.mkdir(parents=True, exist_ok=True)
# Beispielhaftes BPF-Setup (Stub für tatsächliches eBPF-Programm)
prog = "int kprobe__sched_switch(void *ctx) { return 0; }"
_bpf_instance = BPF(text=prog)
# Optional: Schreibe Initialeintrag
init_entry = {
"correlation_id": correlation_id,
"event_type": "setup",
"timestamp": time.time()
}
_append_log(init_entry)
def log_timekeeping_event(event_type: str, correlation_id: str) -> None:
"""Protokolliert ein Zeitmessungsereignis als JSON-Zeile."""
if not isinstance(event_type, str) or not event_type.strip():
raise ValueError("event_type muss ein nicht-leerer String sein.")
if not isinstance(correlation_id, str) or not correlation_id.strip():
raise ValueError("correlation_id muss ein nicht-leerer String sein.")
entry = {
"correlation_id": correlation_id,
"event_type": event_type,
"timestamp": time.time()
}
_append_log(entry)
def _append_log(entry: dict) -> None:
"""Hilfsfunktion zum Anhängen eines JSON-Logeintrags an die Logdatei."""
if not isinstance(entry, dict):
raise TypeError("Ein Logeintrag muss ein Dictionary sein.")
_LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(_LOG_FILE, "a", encoding="utf-8") as f:
json.dump(entry, f, ensure_ascii=False)
f.write("\n")