Add heatmap_visualization/src/heatmap_visualization/core.py

This commit is contained in:
Mika 2026-03-17 11:07:02 +00:00
commit ff2a729aa8

View file

@ -0,0 +1,81 @@
from __future__ import annotations
import logging
from typing import Union, List, Dict
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pydantic import BaseModel, ValidationError, confloat
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class LogData(BaseModel):
"""Datenmodell für Logeinträge eines Worker-Laufs."""
worker_start_offset: confloat(ge=-10000, le=10000)
expires_at_dist_hours: confloat(ge=-10000, le=10000)
retry_total_overhead_ms: confloat(ge=0, le=1e8)
def _validate_input(data: Union[pd.DataFrame, List[Dict]]) -> pd.DataFrame:
"""Validiert Eingabedaten streng und konvertiert sie in DataFrame."""
if isinstance(data, pd.DataFrame):
df = data.copy()
elif isinstance(data, list):
df = pd.DataFrame(data)
else:
raise TypeError("Parameter 'data' muss pd.DataFrame oder list[dict] sein.")
expected_cols = {"worker_start_offset", "expires_at_dist_hours", "retry_total_overhead_ms"}
missing = expected_cols - set(df.columns)
if missing:
raise ValueError(f"Fehlende Spalten in Eingabedaten: {missing}")
# Validierung jedes Eintrags mittels Pydantic
valid_rows = []
for index, row in df.iterrows():
try:
entry = LogData(**row.to_dict())
valid_rows.append(entry.dict())
except ValidationError as e:
logger.warning(f"Eintrag {index} ungültig wird übersprungen: {e.errors()}")
if not valid_rows:
raise ValueError("Keine gültigen Logdaten nach Validierung.")
return pd.DataFrame(valid_rows)
def generate_heatmap(data: Union[pd.DataFrame, List[Dict]]) -> plt.Figure:
"""Erzeugt eine Heatmap der Korrelation zwischen Startoffsets und Latenzmetriken.
Args:
data: Strukturierte Logdaten als pd.DataFrame oder Liste von Dictionaries.
Returns:
Matplotlib-Figure mit Heatmap.
"""
logger.info("Starte Heatmap-Erzeugung.")
df = _validate_input(data)
logger.debug(f"Validierte Datenform: {df.shape}")
corr = df[["worker_start_offset", "expires_at_dist_hours", "retry_total_overhead_ms"]].corr(method="pearson")
logger.debug(f"Korrelationsmatrix:
{corr}")
fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", ax=ax)
ax.set_title("Korrelation zwischen Startoffsets, Expiry und Retry-Latenz")
plt.tight_layout()
logger.info("Heatmap erfolgreich erstellt.")
return fig
__all__ = ["generate_heatmap", "LogData"]