Add ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/cli.py

This commit is contained in:
Mika 2026-01-15 10:47:38 +00:00
parent b8161a6ba5
commit 68ec49022e

View file

@ -0,0 +1,91 @@
import argparse
import json
import os
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List
class PublishEvent:
"""Datenklasse zur Repräsentation eines eBPF Publish-Events."""
def __init__(self, ktime_ns: int, CPU: int, field_tag: str, new_value_hash: int, corr_id: str) -> None:
if not isinstance(ktime_ns, int) or ktime_ns < 0:
raise ValueError("ktime_ns muss eine nichtnegative ganze Zahl sein.")
if not isinstance(CPU, int) or CPU < 0:
raise ValueError("CPU muss eine nichtnegative ganze Zahl sein.")
if not isinstance(field_tag, str):
raise ValueError("field_tag muss ein String sein.")
if not isinstance(new_value_hash, int):
raise ValueError("new_value_hash muss eine ganze Zahl sein.")
if not isinstance(corr_id, str):
raise ValueError("corr_id muss ein String sein.")
self.ktime_ns = ktime_ns
self.CPU = CPU
self.field_tag = field_tag
self.new_value_hash = new_value_hash
self.corr_id = corr_id
def to_dict(self) -> Dict[str, Any]:
return {
"ktime_ns": self.ktime_ns,
"CPU": self.CPU,
"field_tag": self.field_tag,
"new_value_hash": self.new_value_hash,
"corr_id": self.corr_id,
}
def _validate_event_dict(event: Dict[str, Any]) -> bool:
required_fields = {"ktime_ns", "CPU", "field_tag", "new_value_hash", "corr_id"}
missing = required_fields - set(event)
if missing:
raise ValueError(f"Fehlende Felder im Event: {missing}")
return True
def process_events(input_path: Path, output_path: Path) -> None:
if not input_path.exists():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}")
with input_path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("Erwartet wurde eine Liste von Event-Objekten im JSON.")
processed: List[Dict[str, Any]] = []
for evt in data:
if _validate_event_dict(evt):
event_obj = PublishEvent(**evt)
processed.append(event_obj.to_dict())
output_data = {
"metadata": {
"generated_at": datetime.utcnow().isoformat() + "Z",
"source_file": str(input_path),
"event_count": len(processed),
},
"events": processed,
}
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as o:
json.dump(output_data, o, indent=2)
def main() -> None:
parser = argparse.ArgumentParser(description="eBPF Publish Event Instrumentation CLI")
parser.add_argument("--input", required=True, help="Pfad zur JSON-Datei mit eBPF-Publish-Eventdaten.")
parser.add_argument("--output", required=True, help="Zielpfad für die Ausgabe der angereicherten Eventdaten.")
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
process_events(input_path, output_path)
if __name__ == "__main__":
main()