diff --git a/statistical_analysis/src/statistical_analysis/core.py b/statistical_analysis/src/statistical_analysis/core.py new file mode 100644 index 0000000..9940e75 --- /dev/null +++ b/statistical_analysis/src/statistical_analysis/core.py @@ -0,0 +1,102 @@ +from __future__ import annotations +import logging +from dataclasses import dataclass +from typing import List, Dict, Any +import pandas as pd +import numpy as np + + +logger = logging.getLogger(__name__) + + +class OutlierDetectionError(Exception): + """Custom exception raised when data validation or detection fails.""" + pass + + +@dataclass +class OutlierAnalysis: + column_name: str + outlier_value: float + drift_signature: str + timeout_counts: int + + def to_dict(self) -> Dict[str, Any]: + return { + "column_name": self.column_name, + "outlier_value": self.outlier_value, + "drift_signature": self.drift_signature, + "timeout_counts": self.timeout_counts, + } + + +def _validate_input_data(log_data: List[Dict[str, Any]]) -> pd.DataFrame: + if not isinstance(log_data, list) or not all(isinstance(entry, dict) for entry in log_data): + raise OutlierDetectionError("Input log_data must be a list of dictionaries.") + if len(log_data) == 0: + raise OutlierDetectionError("Input log_data is empty.") + + df = pd.DataFrame(log_data) + if df.empty: + raise OutlierDetectionError("Converted DataFrame is empty.") + return df + + +def analyze_outliers(log_data: List[Dict[str, Any]]) -> OutlierAnalysis: + """Analysiert Logdaten und identifiziert Ausreißer mit Fokus auf p99-Region und Latenzverteilungen.""" + logger.debug("Starting outlier analysis.") + df = _validate_input_data(log_data) + + numeric_cols = df.select_dtypes(include=["number"]).columns.tolist() + if not numeric_cols: + raise OutlierDetectionError("No numeric columns found for outlier analysis.") + + # Compute p99 per column + outlier_scores = {} + for col in numeric_cols: + series = df[col].dropna() + if series.empty: + continue + p99_value = np.percentile(series, 99) + outlier_scores[col] = p99_value + logger.debug("Computed p99 for %s: %f", col, p99_value) + + if not outlier_scores: + raise OutlierDetectionError("No valid numeric data available for outlier computation.") + + outlier_col = max(outlier_scores, key=outlier_scores.get) + outlier_value = outlier_scores[outlier_col] + + # Simple drift signature and timeout correlation heuristic + drift_signature = "stable" + timeout_counts = 0 + + if "drift_signature" in df.columns: + sig_counts = df["drift_signature"].value_counts() + if not sig_counts.empty: + drift_signature = sig_counts.idxmax() + + if "timeout_counts" in df.columns: + timeout_counts = int(df["timeout_counts"].sum()) + + result = OutlierAnalysis( + column_name=outlier_col, + outlier_value=float(outlier_value), + drift_signature=str(drift_signature), + timeout_counts=timeout_counts, + ) + + # CI-ready validation + assert isinstance(result.column_name, str) + assert isinstance(result.outlier_value, float) + assert isinstance(result.drift_signature, str) + assert isinstance(result.timeout_counts, int) + + logger.info( + "Outlier analysis completed: column=%s, value=%f, drift=%s, timeouts=%d", + result.column_name, + result.outlier_value, + result.drift_signature, + result.timeout_counts, + ) + return result