From 636b3cd76dff0c6ecfcfee0311477b5376081541 Mon Sep 17 00:00:00 2001 From: Mika Date: Tue, 20 Jan 2026 12:12:23 +0000 Subject: [PATCH] Add trace_aggpy/src/trace_aggpy/main.py --- trace_aggpy/src/trace_aggpy/main.py | 128 ++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 trace_aggpy/src/trace_aggpy/main.py diff --git a/trace_aggpy/src/trace_aggpy/main.py b/trace_aggpy/src/trace_aggpy/main.py new file mode 100644 index 0000000..cae3862 --- /dev/null +++ b/trace_aggpy/src/trace_aggpy/main.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from typing import List, Dict, Any +from statistics import mean +from datetime import datetime + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class AnalysisError(Exception): + """Custom exception for invalid analysis inputs or states.""" + + +@dataclass +class ReadEvent: + corr_id: str + timestamp: float + cpu: int + field_tag: str + retry_free: bool + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> ReadEvent: + required_fields = {"corr_id": str, "timestamp": (int, float), "cpu": int, "field_tag": str, "retry_free": bool} + for k, t in required_fields.items(): + if k not in data: + raise AnalysisError(f"Missing field '{k}' in ReadEvent data.") + if not isinstance(data[k], t): + raise AnalysisError(f"Invalid type for field '{k}': expected {t}, got {type(data[k])}.") + return cls( + corr_id=data["corr_id"], + timestamp=float(data["timestamp"]), + cpu=int(data["cpu"]), + field_tag=str(data["field_tag"]), + retry_free=bool(data["retry_free"]), + ) + + +@dataclass +class ReadAnalysisResult: + corr_id: str + retry_free_count: int + duration_metrics: Dict[str, float] = field(default_factory=dict) + summary_table: List[Dict[str, Any]] = field(default_factory=list) + + def to_json(self) -> Dict[str, Any]: + """Serialize the analysis result into a JSON-compatible dict.""" + return { + "corr_id": self.corr_id, + "retry_free_count": self.retry_free_count, + "duration_metrics": self.duration_metrics, + "summary_table": self.summary_table, + } + + +def _compute_duration_metrics(events: List[ReadEvent]) -> Dict[str, float]: + """Compute basic duration statistics per field_tag based on timestamp deltas.""" + metrics: Dict[str, float] = {} + grouped: Dict[str, List[float]] = {} + sorted_events = sorted(events, key=lambda e: e.timestamp) + for e in sorted_events: + grouped.setdefault(e.field_tag, []).append(e.timestamp) + for tag, ts_list in grouped.items(): + if len(ts_list) > 1: + durations = [t2 - t1 for t1, t2 in zip(ts_list[:-1], ts_list[1:]) if t2 >= t1] + if durations: + metrics[tag] = mean(durations) + return metrics + + +def analyze_reads(corr_id: str, read_data: List[Dict[str, Any]]) -> ReadAnalysisResult: + """ + Analysiert die Read-Events eines N40-Runs, aggregiert retry-freie Reads + und erstellt eine zusammengefasste JSON-Statistik. + """ + if not isinstance(corr_id, str) or not corr_id: + raise AnalysisError("corr_id must be a non-empty string.") + if not isinstance(read_data, list): + raise AnalysisError("read_data must be a list of dicts.") + + events: List[ReadEvent] = [] + for entry in read_data: + try: + events.append(ReadEvent.from_dict(entry)) + except AnalysisError as e: + logger.error(f"Skipping invalid event: {e}") + + corr_events = [e for e in events if e.corr_id == corr_id] + if not corr_events: + raise AnalysisError(f"No events found for corr_id '{corr_id}'.") + + retry_free_events = [e for e in corr_events if e.retry_free] + retry_free_count = len(retry_free_events) + + duration_metrics = _compute_duration_metrics(corr_events) + + summary_table: List[Dict[str, Any]] = [] + field_tags = {e.field_tag for e in corr_events} + for tag in field_tags: + tagged_events = [e for e in corr_events if e.field_tag == tag] + count_total = len(tagged_events) + count_retry_free = sum(1 for e in tagged_events if e.retry_free) + summary_table.append( + { + "field_tag": tag, + "total": count_total, + "retry_free": count_retry_free, + "ratio_retry_free": count_retry_free / count_total if count_total else 0.0, + } + ) + + result = ReadAnalysisResult( + corr_id=corr_id, + retry_free_count=retry_free_count, + duration_metrics=duration_metrics, + summary_table=summary_table, + ) + + logger.info( + f"Analysis completed for corr_id={corr_id}: {retry_free_count} retry-free reads, {len(summary_table)} fields." + ) + + return result