diff --git a/max_outlier_analysis_script/src/max_outlier_analysis_script/core.py b/max_outlier_analysis_script/src/max_outlier_analysis_script/core.py new file mode 100644 index 0000000..c384673 --- /dev/null +++ b/max_outlier_analysis_script/src/max_outlier_analysis_script/core.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import List, Dict, Any +import pandas as pd +import statistics + +__all__ = ["analyze_max_outliers"] + + +# Configure logging for CI diagnostics +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class DataValidationError(Exception): + """Raised when input data validation fails.""" + pass + + +@dataclass +class OutlierRecord: + corr_id: str + stratum: str + job_parallelism: int + expires_at_dist_hours: float + retry_total_overhead_ms: float + latency_max: float + + +@dataclass +class AnalysisResults: + max_above_p99_count: int + near_expiry_cluster_percentage: float + retry_overhead_variance: float + + +_DEF_NEAR_EXPIRY_KEY = "near-expiry-unpinned" + + +def _validate_data(data: List[Dict[str, Any]]) -> None: + required_fields = {f.name for f in OutlierRecord.__dataclass_fields__.values()} + if not isinstance(data, list): + raise DataValidationError("Input data must be a list of dictionaries.") + for i, record in enumerate(data): + if not isinstance(record, dict): + raise DataValidationError(f"Element {i} is not a dict.") + missing = required_fields - record.keys() + if missing: + raise DataValidationError(f"Record {i} missing fields: {missing}") + + + +def analyze_max_outliers(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analysiert Max-Outlier-Daten und berechnet Summary-Kennzahlen. + + Args: + data: Liste von Messwert-Dictionaries aus CI-Lasttest-Exports. + + Returns: + dict: Ergebnisse mit Kennzahlen für Max-Ausreißer. + """ + logger.info("Starting max outlier analysis on %d records", len(data)) + _validate_data(data) + + df = pd.DataFrame(data) + if df.empty: + return { + "max_above_p99_count": 0, + "near_expiry_cluster_percentage": 0.0, + "retry_overhead_variance": 0.0, + } + + # Compute percentiles + p99 = df["latency_max"].quantile(0.99) + above_p99 = df[df["latency_max"] > p99] + max_above_p99_count = int(above_p99.shape[0]) + + # Cluster analysis for 'near-expiry-unpinned' + total_count = df.shape[0] + near_expiry_count = df[df["stratum"] == _DEF_NEAR_EXPIRY_KEY].shape[0] + near_expiry_cluster_percentage = ( + (near_expiry_count / total_count) * 100.0 if total_count > 0 else 0.0 + ) + + # Retry overhead variance + try: + retry_overhead_variance = statistics.variance(df["retry_total_overhead_ms"].tolist()) + except statistics.StatisticsError: + retry_overhead_variance = 0.0 + + result = AnalysisResults( + max_above_p99_count=max_above_p99_count, + near_expiry_cluster_percentage=near_expiry_cluster_percentage, + retry_overhead_variance=retry_overhead_variance, + ) + + logger.info( + "Analysis complete: max_above_p99_count=%d, near_expiry_cluster_percentage=%.2f, retry_overhead_variance=%.3f", + result.max_above_p99_count, + result.near_expiry_cluster_percentage, + result.retry_overhead_variance, + ) + + return { + "max_above_p99_count": result.max_above_p99_count, + "near_expiry_cluster_percentage": result.near_expiry_cluster_percentage, + "retry_overhead_variance": result.retry_overhead_variance, + }