diff --git a/clocksource_switch_analysis/src/clocksource_switch_analysis/core.py b/clocksource_switch_analysis/src/clocksource_switch_analysis/core.py new file mode 100644 index 0000000..c1db6dc --- /dev/null +++ b/clocksource_switch_analysis/src/clocksource_switch_analysis/core.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import statistics +import logging +from dataclasses import dataclass +from typing import List, Dict, Any + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@dataclass +class AggregatedClocksourceMetrics: + mean: float + p50: float + p95: float + p99: float + retry_free_rate: float + + +def _validate_event(event: Dict[str, Any]) -> bool: + required_fields = { + 'timestamp': float, + 'cpu_id': int, + 'seqcount_retry_count': int, + 'clocksource': str, + } + for key, expected_type in required_fields.items(): + if key not in event: + logger.error(f"Missing key in event: {key}") + return False + if not isinstance(event[key], expected_type): + logger.error(f"Invalid type for key '{key}': expected {expected_type.__name__}, got {type(event[key]).__name__}") + return False + return True + + +def aggregate_clocksource_data(raw_data: List[Dict[str, Any]]) -> Dict[str, float]: + """Aggregiert Rohdaten von clocksource_switch-Ereignissen zu statistischen Metriken.""" + assert isinstance(raw_data, list), "raw_data must be a list of dicts" + if not raw_data: + logger.warning("No data provided for aggregation.") + return { + 'mean': 0.0, + 'p50': 0.0, + 'p95': 0.0, + 'p99': 0.0, + 'retry_free_rate': 0.0, + } + + valid_events = [e for e in raw_data if _validate_event(e)] + if not valid_events: + logger.error("All provided events are invalid.") + return { + 'mean': 0.0, + 'p50': 0.0, + 'p95': 0.0, + 'p99': 0.0, + 'retry_free_rate': 0.0, + } + + retry_counts = [e['seqcount_retry_count'] for e in valid_events] + + # Statistik-Metriken berechnen (Latenzen): hier nehmen wir retry_count als Ersatzmetrik + values = retry_counts + + mean_val = statistics.fmean(values) + median = statistics.median(values) + + sorted_vals = sorted(values) + def percentile(data: List[float], pct: float) -> float: + if not data: + return 0.0 + k = (len(data) - 1) * pct / 100 + f = int(k) + c = min(f + 1, len(data) - 1) + if f == c: + return data[int(k)] + d0 = data[f] * (c - k) + d1 = data[c] * (k - f) + return d0 + d1 + + p50 = median + p95 = percentile(sorted_vals, 95) + p99 = percentile(sorted_vals, 99) + + retry_free = sum(1 for e in valid_events if e['seqcount_retry_count'] == 0) + retry_free_rate = retry_free / len(valid_events) + + metrics = AggregatedClocksourceMetrics( + mean=mean_val, + p50=p50, + p95=p95, + p99=p99, + retry_free_rate=retry_free_rate, + ) + + result = { + 'mean': metrics.mean, + 'p50': metrics.p50, + 'p95': metrics.p95, + 'p99': metrics.p99, + 'retry_free_rate': metrics.retry_free_rate, + } + + return result