From d65baef617b16632ba5eb59d5bfe2b72f82a4cca Mon Sep 17 00:00:00 2001 From: Mika Date: Tue, 17 Mar 2026 11:07:04 +0000 Subject: [PATCH] Add data_analysis_script/src/data_analysis_script/core.py --- .../src/data_analysis_script/core.py | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 data_analysis_script/src/data_analysis_script/core.py diff --git a/data_analysis_script/src/data_analysis_script/core.py b/data_analysis_script/src/data_analysis_script/core.py new file mode 100644 index 0000000..ad525e3 --- /dev/null +++ b/data_analysis_script/src/data_analysis_script/core.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Dict + +import pandas as pd +import statistics + + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +class LogAnalysisError(Exception): + """Custom exception for log analysis errors.""" + pass + + +@dataclass +class AnalysisResults: + """Speichert Ergebnisse der Log-Analyse.""" + + max_outlier: float + band_center: float + band_width: float + + def to_json(self) -> Dict[str, Any]: + """Konvertiert die Analyseergebnisse in ein JSON-kompatibles Dict.""" + return asdict(self) + + +def _validate_log_dataframe(df: pd.DataFrame) -> None: + required_cols = {"worker_start_offset", "expires_at_dist_hours", "retry_total_overhead_ms"} + missing = required_cols - set(df.columns) + if missing: + raise LogAnalysisError(f"Missing required columns: {', '.join(missing)}") + if df.empty: + raise LogAnalysisError("Input log file is empty.") + + +def analyze_logs(log_file: str) -> AnalysisResults: + """Analysiert eine Logdatei, extrahiert relevante Metriken und gibt strukturierte Ergebnisse zurück.""" + + path = Path(log_file) + if not path.exists(): + raise LogAnalysisError(f"Log file not found: {log_file}") + + try: + df = pd.read_csv(path) + except Exception as exc: + logger.exception("Fehler beim Einlesen der CSV-Datei") + raise LogAnalysisError(f"Fehler beim Einlesen der CSV-Datei: {exc}") + + _validate_log_dataframe(df) + + # Berechnungen der Metriken + latencies = df["expires_at_dist_hours"].astype(float) + start_offsets = df["worker_start_offset"].astype(float) + + max_outlier = float(latencies.max()) + band_center = float(latencies.mean()) + try: + band_width = float(statistics.stdev(latencies)) + except statistics.StatisticsError: + band_width = 0.0 + + result = AnalysisResults( + max_outlier=max_outlier, + band_center=band_center, + band_width=band_width, + ) + + # Eingebaute Validierungen (CI-Ready) + assert result.max_outlier >= 0, "max_outlier muss nicht-negativ sein" + assert result.band_width >= 0, "band_width muss nicht-negativ sein" + + logger.debug("Analyse abgeschlossen: %s", result) + return result