Add clocksource_switch_analysis/src/clocksource_switch_analysis/core.py

This commit is contained in:
Mika 2026-01-22 11:58:37 +00:00
parent a5a503ee8b
commit b678427be8

View file

@ -0,0 +1,107 @@
from __future__ import annotations
import statistics
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class AggregatedClocksourceMetrics:
mean: float
p50: float
p95: float
p99: float
retry_free_rate: float
def _validate_event(event: Dict[str, Any]) -> bool:
required_fields = {
'timestamp': float,
'cpu_id': int,
'seqcount_retry_count': int,
'clocksource': str,
}
for key, expected_type in required_fields.items():
if key not in event:
logger.error(f"Missing key in event: {key}")
return False
if not isinstance(event[key], expected_type):
logger.error(f"Invalid type for key '{key}': expected {expected_type.__name__}, got {type(event[key]).__name__}")
return False
return True
def aggregate_clocksource_data(raw_data: List[Dict[str, Any]]) -> Dict[str, float]:
"""Aggregiert Rohdaten von clocksource_switch-Ereignissen zu statistischen Metriken."""
assert isinstance(raw_data, list), "raw_data must be a list of dicts"
if not raw_data:
logger.warning("No data provided for aggregation.")
return {
'mean': 0.0,
'p50': 0.0,
'p95': 0.0,
'p99': 0.0,
'retry_free_rate': 0.0,
}
valid_events = [e for e in raw_data if _validate_event(e)]
if not valid_events:
logger.error("All provided events are invalid.")
return {
'mean': 0.0,
'p50': 0.0,
'p95': 0.0,
'p99': 0.0,
'retry_free_rate': 0.0,
}
retry_counts = [e['seqcount_retry_count'] for e in valid_events]
# Statistik-Metriken berechnen (Latenzen): hier nehmen wir retry_count als Ersatzmetrik
values = retry_counts
mean_val = statistics.fmean(values)
median = statistics.median(values)
sorted_vals = sorted(values)
def percentile(data: List[float], pct: float) -> float:
if not data:
return 0.0
k = (len(data) - 1) * pct / 100
f = int(k)
c = min(f + 1, len(data) - 1)
if f == c:
return data[int(k)]
d0 = data[f] * (c - k)
d1 = data[c] * (k - f)
return d0 + d1
p50 = median
p95 = percentile(sorted_vals, 95)
p99 = percentile(sorted_vals, 99)
retry_free = sum(1 for e in valid_events if e['seqcount_retry_count'] == 0)
retry_free_rate = retry_free / len(valid_events)
metrics = AggregatedClocksourceMetrics(
mean=mean_val,
p50=p50,
p95=p95,
p99=p99,
retry_free_rate=retry_free_rate,
)
result = {
'mean': metrics.mean,
'p50': metrics.p50,
'p95': metrics.p95,
'p99': metrics.p99,
'retry_free_rate': metrics.retry_free_rate,
}
return result