Add trace_aggpy/src/trace_aggpy/main.py

This commit is contained in:
Mika 2026-01-20 12:12:23 +00:00
commit 636b3cd76d

View file

@ -0,0 +1,128 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any
from statistics import mean
from datetime import datetime
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class AnalysisError(Exception):
"""Custom exception for invalid analysis inputs or states."""
@dataclass
class ReadEvent:
corr_id: str
timestamp: float
cpu: int
field_tag: str
retry_free: bool
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> ReadEvent:
required_fields = {"corr_id": str, "timestamp": (int, float), "cpu": int, "field_tag": str, "retry_free": bool}
for k, t in required_fields.items():
if k not in data:
raise AnalysisError(f"Missing field '{k}' in ReadEvent data.")
if not isinstance(data[k], t):
raise AnalysisError(f"Invalid type for field '{k}': expected {t}, got {type(data[k])}.")
return cls(
corr_id=data["corr_id"],
timestamp=float(data["timestamp"]),
cpu=int(data["cpu"]),
field_tag=str(data["field_tag"]),
retry_free=bool(data["retry_free"]),
)
@dataclass
class ReadAnalysisResult:
corr_id: str
retry_free_count: int
duration_metrics: Dict[str, float] = field(default_factory=dict)
summary_table: List[Dict[str, Any]] = field(default_factory=list)
def to_json(self) -> Dict[str, Any]:
"""Serialize the analysis result into a JSON-compatible dict."""
return {
"corr_id": self.corr_id,
"retry_free_count": self.retry_free_count,
"duration_metrics": self.duration_metrics,
"summary_table": self.summary_table,
}
def _compute_duration_metrics(events: List[ReadEvent]) -> Dict[str, float]:
"""Compute basic duration statistics per field_tag based on timestamp deltas."""
metrics: Dict[str, float] = {}
grouped: Dict[str, List[float]] = {}
sorted_events = sorted(events, key=lambda e: e.timestamp)
for e in sorted_events:
grouped.setdefault(e.field_tag, []).append(e.timestamp)
for tag, ts_list in grouped.items():
if len(ts_list) > 1:
durations = [t2 - t1 for t1, t2 in zip(ts_list[:-1], ts_list[1:]) if t2 >= t1]
if durations:
metrics[tag] = mean(durations)
return metrics
def analyze_reads(corr_id: str, read_data: List[Dict[str, Any]]) -> ReadAnalysisResult:
"""
Analysiert die Read-Events eines N40-Runs, aggregiert retry-freie Reads
und erstellt eine zusammengefasste JSON-Statistik.
"""
if not isinstance(corr_id, str) or not corr_id:
raise AnalysisError("corr_id must be a non-empty string.")
if not isinstance(read_data, list):
raise AnalysisError("read_data must be a list of dicts.")
events: List[ReadEvent] = []
for entry in read_data:
try:
events.append(ReadEvent.from_dict(entry))
except AnalysisError as e:
logger.error(f"Skipping invalid event: {e}")
corr_events = [e for e in events if e.corr_id == corr_id]
if not corr_events:
raise AnalysisError(f"No events found for corr_id '{corr_id}'.")
retry_free_events = [e for e in corr_events if e.retry_free]
retry_free_count = len(retry_free_events)
duration_metrics = _compute_duration_metrics(corr_events)
summary_table: List[Dict[str, Any]] = []
field_tags = {e.field_tag for e in corr_events}
for tag in field_tags:
tagged_events = [e for e in corr_events if e.field_tag == tag]
count_total = len(tagged_events)
count_retry_free = sum(1 for e in tagged_events if e.retry_free)
summary_table.append(
{
"field_tag": tag,
"total": count_total,
"retry_free": count_retry_free,
"ratio_retry_free": count_retry_free / count_total if count_total else 0.0,
}
)
result = ReadAnalysisResult(
corr_id=corr_id,
retry_free_count=retry_free_count,
duration_metrics=duration_metrics,
summary_table=summary_table,
)
logger.info(
f"Analysis completed for corr_id={corr_id}: {retry_free_count} retry-free reads, {len(summary_table)} fields."
)
return result