commit ff2a729aa811760b12daa48581f2f558f986972f Author: Mika Date: Tue Mar 17 11:07:02 2026 +0000 Add heatmap_visualization/src/heatmap_visualization/core.py diff --git a/heatmap_visualization/src/heatmap_visualization/core.py b/heatmap_visualization/src/heatmap_visualization/core.py new file mode 100644 index 0000000..dbb07ac --- /dev/null +++ b/heatmap_visualization/src/heatmap_visualization/core.py @@ -0,0 +1,81 @@ +from __future__ import annotations +import logging +from typing import Union, List, Dict +import pandas as pd +import seaborn as sns +import matplotlib.pyplot as plt +from pydantic import BaseModel, ValidationError, confloat + + +logger = logging.getLogger(__name__) +if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +class LogData(BaseModel): + """Datenmodell für Logeinträge eines Worker-Laufs.""" + worker_start_offset: confloat(ge=-10000, le=10000) + expires_at_dist_hours: confloat(ge=-10000, le=10000) + retry_total_overhead_ms: confloat(ge=0, le=1e8) + + +def _validate_input(data: Union[pd.DataFrame, List[Dict]]) -> pd.DataFrame: + """Validiert Eingabedaten streng und konvertiert sie in DataFrame.""" + if isinstance(data, pd.DataFrame): + df = data.copy() + elif isinstance(data, list): + df = pd.DataFrame(data) + else: + raise TypeError("Parameter 'data' muss pd.DataFrame oder list[dict] sein.") + + expected_cols = {"worker_start_offset", "expires_at_dist_hours", "retry_total_overhead_ms"} + missing = expected_cols - set(df.columns) + if missing: + raise ValueError(f"Fehlende Spalten in Eingabedaten: {missing}") + + # Validierung jedes Eintrags mittels Pydantic + valid_rows = [] + for index, row in df.iterrows(): + try: + entry = LogData(**row.to_dict()) + valid_rows.append(entry.dict()) + except ValidationError as e: + logger.warning(f"Eintrag {index} ungültig – wird übersprungen: {e.errors()}") + + if not valid_rows: + raise ValueError("Keine gültigen Logdaten nach Validierung.") + + return pd.DataFrame(valid_rows) + + +def generate_heatmap(data: Union[pd.DataFrame, List[Dict]]) -> plt.Figure: + """Erzeugt eine Heatmap der Korrelation zwischen Startoffsets und Latenzmetriken. + + Args: + data: Strukturierte Logdaten als pd.DataFrame oder Liste von Dictionaries. + + Returns: + Matplotlib-Figure mit Heatmap. + """ + logger.info("Starte Heatmap-Erzeugung.") + df = _validate_input(data) + + logger.debug(f"Validierte Datenform: {df.shape}") + corr = df[["worker_start_offset", "expires_at_dist_hours", "retry_total_overhead_ms"]].corr(method="pearson") + logger.debug(f"Korrelationsmatrix: +{corr}") + + fig, ax = plt.subplots(figsize=(8, 6)) + sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", ax=ax) + ax.set_title("Korrelation zwischen Startoffsets, Expiry und Retry-Latenz") + plt.tight_layout() + + logger.info("Heatmap erfolgreich erstellt.") + return fig + + +__all__ = ["generate_heatmap", "LogData"]