Add trace_agg.py/src/trace_agg/cli.py

This commit is contained in:
Mika 2026-01-18 17:11:10 +00:00
parent e114968ed0
commit 7345fde6da

View file

@ -0,0 +1,86 @@
import argparse
import json
import logging
from pathlib import Path
from typing import Any
import pandas as pd
from trace_agg.core import analyze_trace, TraceData
def _load_trace_data(path: Path) -> list[TraceData]:
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"Trace input file not found: {path}")
try:
with path.open('r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in input file: {e}") from e
if not isinstance(data, list):
raise ValueError("Trace input must be a JSON array of objects.")
trace_items: list[TraceData] = []
for idx, item in enumerate(data):
if not isinstance(item, dict):
raise ValueError(f"Trace item at index {idx} is not a JSON object.")
try:
corr_id = int(item['corr_id'])
event_type = str(item['event_type'])
timestamp = float(item['timestamp'])
except (KeyError, TypeError, ValueError) as e:
raise ValueError(f"Invalid or missing fields in trace item {idx}: {e}") from e
trace_items.append(TraceData(corr_id, event_type, timestamp))
assert all(isinstance(t, TraceData) for t in trace_items), "All items must be TraceData instances."
return trace_items
def _save_results(results: dict[str, Any], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as f:
json.dump(results, f, indent=2)
def main() -> None:
parser = argparse.ArgumentParser(
description="CLI for eBPF Trace Aggregation Analysis."
)
parser.add_argument(
"--input",
type=str,
required=True,
help="Pfad zur JSON-Datei mit eBPF-Trace-Daten."
)
parser.add_argument(
"--output",
type=str,
required=True,
help="Pfad zur Ausgabe der Analyseergebnisse."
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
input_path = Path(args.input)
output_path = Path(args.output)
logging.info(f"Loading trace data from {input_path}")
trace_data = _load_trace_data(input_path)
logging.info("Analyzing trace data ...")
results = analyze_trace(trace_data)
if not isinstance(results, dict):
raise TypeError("analyze_trace must return a dict with analysis results.")
logging.info(f"Saving analysis results to {output_path}")
_save_results(results, output_path)
logging.info("Analysis completed successfully.")
if __name__ == "__main__":
main()