Add metrics_reporting/src/metrics_reporting/core.py
This commit is contained in:
parent
7266062547
commit
09b1a74da6
1 changed files with 74 additions and 0 deletions
74
metrics_reporting/src/metrics_reporting/core.py
Normal file
74
metrics_reporting/src/metrics_reporting/core.py
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
from __future__ import annotations
|
||||
import json
|
||||
import logging
|
||||
from typing import List, Dict, Any
|
||||
from dataclasses import dataclass, asdict
|
||||
from statistics import mean
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetricSummary:
|
||||
warn_rate: float
|
||||
drift_detected: bool
|
||||
performance: float
|
||||
|
||||
|
||||
def report_metrics(logs: List[Dict[str, Any]], threshold: float) -> Dict[str, Any]:
|
||||
"""Berechnet aggregierte Metriken aus Log-Daten zur Driftbewertung.
|
||||
|
||||
Args:
|
||||
logs: Liste von Log-Einträgen (dicts) mit mindestens 'level' und 'duration' Feldern.
|
||||
threshold: Schwellwert, ab dem eine Drift als signifikant gilt.
|
||||
|
||||
Returns:
|
||||
Eine JSON-kompatible dict-Zusammenfassung der berechneten Kennzahlen.
|
||||
"""
|
||||
assert isinstance(logs, list), "logs muss eine Liste von Dicts sein"
|
||||
assert all(isinstance(l, dict) for l in logs), "Jeder Log-Eintrag muss ein Dict sein"
|
||||
assert isinstance(threshold, (int, float)), "threshold muss float oder int sein"
|
||||
|
||||
if not logs:
|
||||
logger.warning("Leere Log-Liste übergeben; gebe neutrale Metriken zurück.")
|
||||
result = MetricSummary(warn_rate=0.0, drift_detected=False, performance=0.0)
|
||||
return asdict(result)
|
||||
|
||||
try:
|
||||
df = pd.DataFrame(logs)
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Erstellen eines DataFrame aus Logs: {e}")
|
||||
raise ValueError("Ungültige Log-Daten übergeben.") from e
|
||||
|
||||
if 'level' not in df.columns:
|
||||
raise ValueError("Log-Einträge müssen ein 'level'-Feld enthalten.")
|
||||
|
||||
# Warn-Rate berechnen
|
||||
warn_count = (df['level'] == 'WARN').sum()
|
||||
total_count = len(df)
|
||||
warn_rate = warn_count / total_count if total_count > 0 else 0.0
|
||||
|
||||
# Performance (z. B. Durchschnitts-Dauer pro Event)
|
||||
performance_metric = 0.0
|
||||
if 'duration' in df.columns:
|
||||
durations = df['duration'].dropna()
|
||||
if not durations.empty:
|
||||
performance_metric = float(mean(durations))
|
||||
|
||||
# Drift-Erkennung basierend auf Schwellwert und Warnrate
|
||||
drift_detected = warn_rate > threshold
|
||||
|
||||
result = MetricSummary(
|
||||
warn_rate=round(float(warn_rate), 5),
|
||||
drift_detected=drift_detected,
|
||||
performance=round(float(performance_metric), 5)
|
||||
)
|
||||
|
||||
# CI-sichere Validierung
|
||||
assert 0.0 <= result.warn_rate <= 1.0, "warn_rate muss zwischen 0 und 1 liegen"
|
||||
assert isinstance(result.drift_detected, bool), "drift_detected muss bool sein"
|
||||
assert result.performance >= 0.0, "performance darf nicht negativ sein"
|
||||
|
||||
return asdict(result)
|
||||
Loading…
Reference in a new issue