From 7345fde6dac65929edb1b3ddfff2b6c454458179 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 18 Jan 2026 17:11:10 +0000 Subject: [PATCH] Add trace_agg.py/src/trace_agg/cli.py --- trace_agg.py/src/trace_agg/cli.py | 86 +++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 trace_agg.py/src/trace_agg/cli.py diff --git a/trace_agg.py/src/trace_agg/cli.py b/trace_agg.py/src/trace_agg/cli.py new file mode 100644 index 0000000..789345b --- /dev/null +++ b/trace_agg.py/src/trace_agg/cli.py @@ -0,0 +1,86 @@ +import argparse +import json +import logging +from pathlib import Path +from typing import Any + +import pandas as pd + +from trace_agg.core import analyze_trace, TraceData + + +def _load_trace_data(path: Path) -> list[TraceData]: + if not path.exists() or not path.is_file(): + raise FileNotFoundError(f"Trace input file not found: {path}") + + try: + with path.open('r', encoding='utf-8') as f: + data = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in input file: {e}") from e + + if not isinstance(data, list): + raise ValueError("Trace input must be a JSON array of objects.") + + trace_items: list[TraceData] = [] + for idx, item in enumerate(data): + if not isinstance(item, dict): + raise ValueError(f"Trace item at index {idx} is not a JSON object.") + try: + corr_id = int(item['corr_id']) + event_type = str(item['event_type']) + timestamp = float(item['timestamp']) + except (KeyError, TypeError, ValueError) as e: + raise ValueError(f"Invalid or missing fields in trace item {idx}: {e}") from e + trace_items.append(TraceData(corr_id, event_type, timestamp)) + + assert all(isinstance(t, TraceData) for t in trace_items), "All items must be TraceData instances." + return trace_items + + +def _save_results(results: dict[str, Any], output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open('w', encoding='utf-8') as f: + json.dump(results, f, indent=2) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="CLI for eBPF Trace Aggregation Analysis." + ) + parser.add_argument( + "--input", + type=str, + required=True, + help="Pfad zur JSON-Datei mit eBPF-Trace-Daten." + ) + parser.add_argument( + "--output", + type=str, + required=True, + help="Pfad zur Ausgabe der Analyseergebnisse." + ) + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + + input_path = Path(args.input) + output_path = Path(args.output) + + logging.info(f"Loading trace data from {input_path}") + trace_data = _load_trace_data(input_path) + + logging.info("Analyzing trace data ...") + results = analyze_trace(trace_data) + + if not isinstance(results, dict): + raise TypeError("analyze_trace must return a dict with analysis results.") + + logging.info(f"Saving analysis results to {output_path}") + _save_results(results, output_path) + + logging.info("Analysis completed successfully.") + + +if __name__ == "__main__": + main()