From 09b1a74da6d2e16432dbcbe5483494810352d12d Mon Sep 17 00:00:00 2001 From: Mika Date: Mon, 23 Feb 2026 14:48:38 +0000 Subject: [PATCH] Add metrics_reporting/src/metrics_reporting/core.py --- .../src/metrics_reporting/core.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 metrics_reporting/src/metrics_reporting/core.py diff --git a/metrics_reporting/src/metrics_reporting/core.py b/metrics_reporting/src/metrics_reporting/core.py new file mode 100644 index 0000000..98383f7 --- /dev/null +++ b/metrics_reporting/src/metrics_reporting/core.py @@ -0,0 +1,74 @@ +from __future__ import annotations +import json +import logging +from typing import List, Dict, Any +from dataclasses import dataclass, asdict +from statistics import mean +from pathlib import Path +import pandas as pd + +logger = logging.getLogger(__name__) + + +@dataclass +class MetricSummary: + warn_rate: float + drift_detected: bool + performance: float + + +def report_metrics(logs: List[Dict[str, Any]], threshold: float) -> Dict[str, Any]: + """Berechnet aggregierte Metriken aus Log-Daten zur Driftbewertung. + + Args: + logs: Liste von Log-Einträgen (dicts) mit mindestens 'level' und 'duration' Feldern. + threshold: Schwellwert, ab dem eine Drift als signifikant gilt. + + Returns: + Eine JSON-kompatible dict-Zusammenfassung der berechneten Kennzahlen. + """ + assert isinstance(logs, list), "logs muss eine Liste von Dicts sein" + assert all(isinstance(l, dict) for l in logs), "Jeder Log-Eintrag muss ein Dict sein" + assert isinstance(threshold, (int, float)), "threshold muss float oder int sein" + + if not logs: + logger.warning("Leere Log-Liste übergeben; gebe neutrale Metriken zurück.") + result = MetricSummary(warn_rate=0.0, drift_detected=False, performance=0.0) + return asdict(result) + + try: + df = pd.DataFrame(logs) + except Exception as e: + logger.error(f"Fehler beim Erstellen eines DataFrame aus Logs: {e}") + raise ValueError("Ungültige Log-Daten übergeben.") from e + + if 'level' not in df.columns: + raise ValueError("Log-Einträge müssen ein 'level'-Feld enthalten.") + + # Warn-Rate berechnen + warn_count = (df['level'] == 'WARN').sum() + total_count = len(df) + warn_rate = warn_count / total_count if total_count > 0 else 0.0 + + # Performance (z. B. Durchschnitts-Dauer pro Event) + performance_metric = 0.0 + if 'duration' in df.columns: + durations = df['duration'].dropna() + if not durations.empty: + performance_metric = float(mean(durations)) + + # Drift-Erkennung basierend auf Schwellwert und Warnrate + drift_detected = warn_rate > threshold + + result = MetricSummary( + warn_rate=round(float(warn_rate), 5), + drift_detected=drift_detected, + performance=round(float(performance_metric), 5) + ) + + # CI-sichere Validierung + assert 0.0 <= result.warn_rate <= 1.0, "warn_rate muss zwischen 0 und 1 liegen" + assert isinstance(result.drift_detected, bool), "drift_detected muss bool sein" + assert result.performance >= 0.0, "performance darf nicht negativ sein" + + return asdict(result)