Add max_outlier_analysis_script/src/max_outlier_analysis_script/core.py

This commit is contained in:
Mika 2026-03-11 12:43:10 +00:00
parent cf75341bdb
commit bd3d545b1b

View file

@ -0,0 +1,110 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd
import statistics
__all__ = ["analyze_max_outliers"]
# Configure logging for CI diagnostics
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class DataValidationError(Exception):
"""Raised when input data validation fails."""
pass
@dataclass
class OutlierRecord:
corr_id: str
stratum: str
job_parallelism: int
expires_at_dist_hours: float
retry_total_overhead_ms: float
latency_max: float
@dataclass
class AnalysisResults:
max_above_p99_count: int
near_expiry_cluster_percentage: float
retry_overhead_variance: float
_DEF_NEAR_EXPIRY_KEY = "near-expiry-unpinned"
def _validate_data(data: List[Dict[str, Any]]) -> None:
required_fields = {f.name for f in OutlierRecord.__dataclass_fields__.values()}
if not isinstance(data, list):
raise DataValidationError("Input data must be a list of dictionaries.")
for i, record in enumerate(data):
if not isinstance(record, dict):
raise DataValidationError(f"Element {i} is not a dict.")
missing = required_fields - record.keys()
if missing:
raise DataValidationError(f"Record {i} missing fields: {missing}")
def analyze_max_outliers(data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analysiert Max-Outlier-Daten und berechnet Summary-Kennzahlen.
Args:
data: Liste von Messwert-Dictionaries aus CI-Lasttest-Exports.
Returns:
dict: Ergebnisse mit Kennzahlen für Max-Ausreißer.
"""
logger.info("Starting max outlier analysis on %d records", len(data))
_validate_data(data)
df = pd.DataFrame(data)
if df.empty:
return {
"max_above_p99_count": 0,
"near_expiry_cluster_percentage": 0.0,
"retry_overhead_variance": 0.0,
}
# Compute percentiles
p99 = df["latency_max"].quantile(0.99)
above_p99 = df[df["latency_max"] > p99]
max_above_p99_count = int(above_p99.shape[0])
# Cluster analysis for 'near-expiry-unpinned'
total_count = df.shape[0]
near_expiry_count = df[df["stratum"] == _DEF_NEAR_EXPIRY_KEY].shape[0]
near_expiry_cluster_percentage = (
(near_expiry_count / total_count) * 100.0 if total_count > 0 else 0.0
)
# Retry overhead variance
try:
retry_overhead_variance = statistics.variance(df["retry_total_overhead_ms"].tolist())
except statistics.StatisticsError:
retry_overhead_variance = 0.0
result = AnalysisResults(
max_above_p99_count=max_above_p99_count,
near_expiry_cluster_percentage=near_expiry_cluster_percentage,
retry_overhead_variance=retry_overhead_variance,
)
logger.info(
"Analysis complete: max_above_p99_count=%d, near_expiry_cluster_percentage=%.2f, retry_overhead_variance=%.3f",
result.max_above_p99_count,
result.near_expiry_cluster_percentage,
result.retry_overhead_variance,
)
return {
"max_above_p99_count": result.max_above_p99_count,
"near_expiry_cluster_percentage": result.near_expiry_cluster_percentage,
"retry_overhead_variance": result.retry_overhead_variance,
}