diff --git a/ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/cli.py b/ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/cli.py new file mode 100644 index 0000000..b4454ca --- /dev/null +++ b/ebpf_trace_instrumentation/src/ebpf_trace_instrumentation/cli.py @@ -0,0 +1,91 @@ +import argparse +import json +import os +from pathlib import Path +from datetime import datetime +from typing import Any, Dict, List + + +class PublishEvent: + """Datenklasse zur Repräsentation eines eBPF Publish-Events.""" + + def __init__(self, ktime_ns: int, CPU: int, field_tag: str, new_value_hash: int, corr_id: str) -> None: + if not isinstance(ktime_ns, int) or ktime_ns < 0: + raise ValueError("ktime_ns muss eine nichtnegative ganze Zahl sein.") + if not isinstance(CPU, int) or CPU < 0: + raise ValueError("CPU muss eine nichtnegative ganze Zahl sein.") + if not isinstance(field_tag, str): + raise ValueError("field_tag muss ein String sein.") + if not isinstance(new_value_hash, int): + raise ValueError("new_value_hash muss eine ganze Zahl sein.") + if not isinstance(corr_id, str): + raise ValueError("corr_id muss ein String sein.") + + self.ktime_ns = ktime_ns + self.CPU = CPU + self.field_tag = field_tag + self.new_value_hash = new_value_hash + self.corr_id = corr_id + + def to_dict(self) -> Dict[str, Any]: + return { + "ktime_ns": self.ktime_ns, + "CPU": self.CPU, + "field_tag": self.field_tag, + "new_value_hash": self.new_value_hash, + "corr_id": self.corr_id, + } + + +def _validate_event_dict(event: Dict[str, Any]) -> bool: + required_fields = {"ktime_ns", "CPU", "field_tag", "new_value_hash", "corr_id"} + missing = required_fields - set(event) + if missing: + raise ValueError(f"Fehlende Felder im Event: {missing}") + return True + + +def process_events(input_path: Path, output_path: Path) -> None: + if not input_path.exists(): + raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}") + with input_path.open("r", encoding="utf-8") as f: + data = json.load(f) + + if not isinstance(data, list): + raise ValueError("Erwartet wurde eine Liste von Event-Objekten im JSON.") + + processed: List[Dict[str, Any]] = [] + for evt in data: + if _validate_event_dict(evt): + event_obj = PublishEvent(**evt) + processed.append(event_obj.to_dict()) + + output_data = { + "metadata": { + "generated_at": datetime.utcnow().isoformat() + "Z", + "source_file": str(input_path), + "event_count": len(processed), + }, + "events": processed, + } + + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as o: + json.dump(output_data, o, indent=2) + + +def main() -> None: + parser = argparse.ArgumentParser(description="eBPF Publish Event Instrumentation CLI") + parser.add_argument("--input", required=True, help="Pfad zur JSON-Datei mit eBPF-Publish-Eventdaten.") + parser.add_argument("--output", required=True, help="Zielpfad für die Ausgabe der angereicherten Eventdaten.") + + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + process_events(input_path, output_path) + + +if __name__ == "__main__": + main() \ No newline at end of file