From 85fd92bba5af2dcc4a382d2dc4908be9308dfff6 Mon Sep 17 00:00:00 2001 From: Mika Date: Fri, 13 Mar 2026 16:23:00 +0000 Subject: [PATCH] Add outlier_analysis/src/outlier_analysis/core.py --- outlier_analysis/src/outlier_analysis/core.py | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 outlier_analysis/src/outlier_analysis/core.py diff --git a/outlier_analysis/src/outlier_analysis/core.py b/outlier_analysis/src/outlier_analysis/core.py new file mode 100644 index 0000000..3baddef --- /dev/null +++ b/outlier_analysis/src/outlier_analysis/core.py @@ -0,0 +1,76 @@ +from __future__ import annotations +import json +import statistics +from pathlib import Path +from typing import List, Dict, Any +import pandas as pd + + +class InputValidationError(ValueError): + """Raised when input validation fails for log entries.""" + pass + + +def _validate_log_entries(log_entries: List[Dict[str, Any]]) -> None: + if not isinstance(log_entries, list): + raise InputValidationError("log_entries must be a list of dicts.") + for entry in log_entries: + if not isinstance(entry, dict): + raise InputValidationError("Each log entry must be a dict.") + required_fields = [ + "corr_id", + "stratum", + "retry_total_overhead_ms", + ] + for field in required_fields: + if field not in entry: + raise InputValidationError(f"Missing required field '{field}' in log entry.") + + +def analyze_outliers(log_entries: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analysiert Log-Einträge und erstellt statistische Kennzahlen zu Outliern. + + Args: + log_entries (List[Dict[str, Any]]): Liste von Log-Einträgen. + + Returns: + Dict[str, Any]: Statistik- und Clusterinformationen im JSON-kompatiblen Format. + """ + _validate_log_entries(log_entries) + + df = pd.DataFrame(log_entries) + if df.empty or 'retry_total_overhead_ms' not in df.columns: + raise InputValidationError("No retry_total_overhead_ms data found.") + + values = df['retry_total_overhead_ms'].dropna().astype(float).tolist() + if not values: + raise InputValidationError("No valid numerical values for retry_total_overhead_ms.") + + report: Dict[str, Any] = { + "mean": float(statistics.fmean(values)), + "median": float(statistics.median(values)), + "p90": float(df['retry_total_overhead_ms'].quantile(0.90)), + "p95": float(df['retry_total_overhead_ms'].quantile(0.95)), + "p99": float(df['retry_total_overhead_ms'].quantile(0.99)), + "max": float(df['retry_total_overhead_ms'].max()), + "clusters": [], + } + + high_threshold = report["p95"] + clusters = [] + high_outliers = df[df['retry_total_overhead_ms'] >= high_threshold] + if not high_outliers.empty: + grouped = high_outliers.groupby('stratum') + for name, group in grouped: + clusters.append({ + "stratum": name, + "count": int(len(group)), + "mean_overhead": float(group['retry_total_overhead_ms'].mean()), + "max_overhead": float(group['retry_total_overhead_ms'].max()), + }) + report["clusters"] = clusters + + # CI assertions for data integrity + assert set(report.keys()) == {"mean", "median", "p90", "p95", "p99", "max", "clusters"}, "Unexpected report format" + + return report