Add trace_agg/src/trace_agg/cli.py

This commit is contained in:
Mika 2025-12-21 11:58:05 +00:00
parent ffcba03735
commit 3664b2f22b

View file

@ -0,0 +1,48 @@
import argparse
import json
from pathlib import Path
from trace_agg.core import aggregate_traces
def load_trace_data(input_path: Path):
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
with input_path.open('r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("Expected a list of TraceData objects in JSON input.")
required_fields = {"entry_time", "first_read_time", "baseline_recalc_time"}
for item in data:
if not required_fields.issubset(item.keys()):
raise ValueError(f"Missing fields in trace element: {item}")
return data
def write_output(output_path: Path, delta_chain):
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as f:
json.dump(delta_chain, f, indent=2)
def main():
parser = argparse.ArgumentParser(
description="Aggregate BPF trace data and compute delta chains."
)
parser.add_argument(
"--input", required=True, help="Path to input JSON file with TraceData events."
)
parser.add_argument(
"--output", default="output/delta_chain.json", help="Output path for delta chain JSON."
)
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
trace_data = load_trace_data(input_path)
delta_chain = aggregate_traces(trace_data)
write_output(output_path, delta_chain)
if __name__ == "__main__":
main()