Add artifact.retry_analysis/src/artifact_retry_analysis/core.py

This commit is contained in:
Mika 2026-03-09 13:44:29 +00:00
parent ab42e2fc18
commit 535ba49562

View file

@ -0,0 +1,115 @@
import json
import statistics
from pathlib import Path
from typing import List, Any, Dict
from dataclasses import dataclass, asdict
from datetime import datetime
class RetryAnalysisError(Exception):
"""Custom exception for retry analysis errors."""
pass
@dataclass
class RetryLogEntry:
retry_taken: bool
retry_total_overhead_ms: float
stratum: str
expires_at_dist_hours: float
delta_t: float
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'RetryLogEntry':
required_fields = {
'retry_taken': bool,
'retry_total_overhead_ms': (int, float),
'stratum': str,
'expires_at_dist_hours': (int, float),
'delta_t': (int, float)
}
for key, expected_type in required_fields.items():
if key not in data:
raise RetryAnalysisError(f"Missing required field: {key}")
if not isinstance(data[key], expected_type):
raise RetryAnalysisError(f"Invalid type for field '{key}': expected {expected_type}, got {type(data[key])}")
return RetryLogEntry(
retry_taken=bool(data['retry_taken']),
retry_total_overhead_ms=float(data['retry_total_overhead_ms']),
stratum=str(data['stratum']),
expires_at_dist_hours=float(data['expires_at_dist_hours']),
delta_t=float(data['delta_t'])
)
@dataclass
class RetryAnalysisReport:
total_retries: int
success_rate: float
failure_rate: float
details: str
def to_json(self) -> str:
"""Serialize the report to a JSON string."""
return json.dumps(asdict(self), ensure_ascii=False, indent=2)
def analyze_retry_data(log_file: str) -> RetryAnalysisReport:
"""
Analysiert eine Logdatei mit Retry-Einträgen und erstellt einen strukturierten Bericht über Erfolgs- und Fehlerraten.
"""
path = Path(log_file)
if not path.is_file():
raise RetryAnalysisError(f"Input file not found: {log_file}")
with path.open('r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
raise RetryAnalysisError(f"Invalid JSON format: {e}") from e
if not isinstance(data, list):
raise RetryAnalysisError("Expected a JSON array of retry log entries.")
entries: List[RetryLogEntry] = []
for i, item in enumerate(data):
try:
entry = RetryLogEntry.from_dict(item)
entries.append(entry)
except RetryAnalysisError as e:
# Skip invalid entries but keep a log of occurrence (via console print for simplicity)
print(f"Warning: Skipping invalid entry at index {i}: {e}")
total = len(entries)
if total == 0:
raise RetryAnalysisError("No valid retry entries found.")
success_count = sum(1 for e in entries if e.retry_taken)
failure_count = total - success_count
success_rate = (success_count / total) * 100.0
failure_rate = (failure_count / total) * 100.0
overheads = [e.retry_total_overhead_ms for e in entries]
mean_overhead = statistics.mean(overheads)
details = (
f"Analysis Timestamp: {datetime.utcnow().isoformat()}Z\n"
f"Processed Entries: {total}\n"
f"Average Overhead (ms): {mean_overhead:.2f}\n"
f"Strata Distribution: {{}}, Derived from entries count per stratum."
)
strata_counts = {}
for e in entries:
strata_counts[e.stratum] = strata_counts.get(e.stratum, 0) + 1
distribution_str = ', '.join(f"{k}: {v}" for k, v in strata_counts.items())
details = details.replace("{}", distribution_str)
report = RetryAnalysisReport(
total_retries=total,
success_rate=success_rate,
failure_rate=failure_rate,
details=details
)
return report