Add outlier_analysis/src/outlier_analysis/core.py

This commit is contained in:
Mika 2026-03-13 16:23:00 +00:00
parent 4b052da98a
commit 85fd92bba5

View file

@ -0,0 +1,76 @@
from __future__ import annotations
import json
import statistics
from pathlib import Path
from typing import List, Dict, Any
import pandas as pd
class InputValidationError(ValueError):
"""Raised when input validation fails for log entries."""
pass
def _validate_log_entries(log_entries: List[Dict[str, Any]]) -> None:
if not isinstance(log_entries, list):
raise InputValidationError("log_entries must be a list of dicts.")
for entry in log_entries:
if not isinstance(entry, dict):
raise InputValidationError("Each log entry must be a dict.")
required_fields = [
"corr_id",
"stratum",
"retry_total_overhead_ms",
]
for field in required_fields:
if field not in entry:
raise InputValidationError(f"Missing required field '{field}' in log entry.")
def analyze_outliers(log_entries: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analysiert Log-Einträge und erstellt statistische Kennzahlen zu Outliern.
Args:
log_entries (List[Dict[str, Any]]): Liste von Log-Einträgen.
Returns:
Dict[str, Any]: Statistik- und Clusterinformationen im JSON-kompatiblen Format.
"""
_validate_log_entries(log_entries)
df = pd.DataFrame(log_entries)
if df.empty or 'retry_total_overhead_ms' not in df.columns:
raise InputValidationError("No retry_total_overhead_ms data found.")
values = df['retry_total_overhead_ms'].dropna().astype(float).tolist()
if not values:
raise InputValidationError("No valid numerical values for retry_total_overhead_ms.")
report: Dict[str, Any] = {
"mean": float(statistics.fmean(values)),
"median": float(statistics.median(values)),
"p90": float(df['retry_total_overhead_ms'].quantile(0.90)),
"p95": float(df['retry_total_overhead_ms'].quantile(0.95)),
"p99": float(df['retry_total_overhead_ms'].quantile(0.99)),
"max": float(df['retry_total_overhead_ms'].max()),
"clusters": [],
}
high_threshold = report["p95"]
clusters = []
high_outliers = df[df['retry_total_overhead_ms'] >= high_threshold]
if not high_outliers.empty:
grouped = high_outliers.groupby('stratum')
for name, group in grouped:
clusters.append({
"stratum": name,
"count": int(len(group)),
"mean_overhead": float(group['retry_total_overhead_ms'].mean()),
"max_overhead": float(group['retry_total_overhead_ms'].max()),
})
report["clusters"] = clusters
# CI assertions for data integrity
assert set(report.keys()) == {"mean", "median", "p90", "p95", "p99", "max", "clusters"}, "Unexpected report format"
return report