Add statistical_analysis/src/statistical_analysis/core.py

This commit is contained in:
Mika 2026-02-16 15:27:11 +00:00
parent 268db267bc
commit 207086773d

View file

@ -0,0 +1,102 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd
import numpy as np
logger = logging.getLogger(__name__)
class OutlierDetectionError(Exception):
"""Custom exception raised when data validation or detection fails."""
pass
@dataclass
class OutlierAnalysis:
column_name: str
outlier_value: float
drift_signature: str
timeout_counts: int
def to_dict(self) -> Dict[str, Any]:
return {
"column_name": self.column_name,
"outlier_value": self.outlier_value,
"drift_signature": self.drift_signature,
"timeout_counts": self.timeout_counts,
}
def _validate_input_data(log_data: List[Dict[str, Any]]) -> pd.DataFrame:
if not isinstance(log_data, list) or not all(isinstance(entry, dict) for entry in log_data):
raise OutlierDetectionError("Input log_data must be a list of dictionaries.")
if len(log_data) == 0:
raise OutlierDetectionError("Input log_data is empty.")
df = pd.DataFrame(log_data)
if df.empty:
raise OutlierDetectionError("Converted DataFrame is empty.")
return df
def analyze_outliers(log_data: List[Dict[str, Any]]) -> OutlierAnalysis:
"""Analysiert Logdaten und identifiziert Ausreißer mit Fokus auf p99-Region und Latenzverteilungen."""
logger.debug("Starting outlier analysis.")
df = _validate_input_data(log_data)
numeric_cols = df.select_dtypes(include=["number"]).columns.tolist()
if not numeric_cols:
raise OutlierDetectionError("No numeric columns found for outlier analysis.")
# Compute p99 per column
outlier_scores = {}
for col in numeric_cols:
series = df[col].dropna()
if series.empty:
continue
p99_value = np.percentile(series, 99)
outlier_scores[col] = p99_value
logger.debug("Computed p99 for %s: %f", col, p99_value)
if not outlier_scores:
raise OutlierDetectionError("No valid numeric data available for outlier computation.")
outlier_col = max(outlier_scores, key=outlier_scores.get)
outlier_value = outlier_scores[outlier_col]
# Simple drift signature and timeout correlation heuristic
drift_signature = "stable"
timeout_counts = 0
if "drift_signature" in df.columns:
sig_counts = df["drift_signature"].value_counts()
if not sig_counts.empty:
drift_signature = sig_counts.idxmax()
if "timeout_counts" in df.columns:
timeout_counts = int(df["timeout_counts"].sum())
result = OutlierAnalysis(
column_name=outlier_col,
outlier_value=float(outlier_value),
drift_signature=str(drift_signature),
timeout_counts=timeout_counts,
)
# CI-ready validation
assert isinstance(result.column_name, str)
assert isinstance(result.outlier_value, float)
assert isinstance(result.drift_signature, str)
assert isinstance(result.timeout_counts, int)
logger.info(
"Outlier analysis completed: column=%s, value=%f, drift=%s, timeouts=%d",
result.column_name,
result.outlier_value,
result.drift_signature,
result.timeout_counts,
)
return result