Add data_analysis_script/src/data_analysis_script/core.py

This commit is contained in:
Mika 2026-03-17 11:07:04 +00:00
parent 17e9f9abf6
commit d65baef617

View file

@ -0,0 +1,81 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict
import pandas as pd
import statistics
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
class LogAnalysisError(Exception):
"""Custom exception for log analysis errors."""
pass
@dataclass
class AnalysisResults:
"""Speichert Ergebnisse der Log-Analyse."""
max_outlier: float
band_center: float
band_width: float
def to_json(self) -> Dict[str, Any]:
"""Konvertiert die Analyseergebnisse in ein JSON-kompatibles Dict."""
return asdict(self)
def _validate_log_dataframe(df: pd.DataFrame) -> None:
required_cols = {"worker_start_offset", "expires_at_dist_hours", "retry_total_overhead_ms"}
missing = required_cols - set(df.columns)
if missing:
raise LogAnalysisError(f"Missing required columns: {', '.join(missing)}")
if df.empty:
raise LogAnalysisError("Input log file is empty.")
def analyze_logs(log_file: str) -> AnalysisResults:
"""Analysiert eine Logdatei, extrahiert relevante Metriken und gibt strukturierte Ergebnisse zurück."""
path = Path(log_file)
if not path.exists():
raise LogAnalysisError(f"Log file not found: {log_file}")
try:
df = pd.read_csv(path)
except Exception as exc:
logger.exception("Fehler beim Einlesen der CSV-Datei")
raise LogAnalysisError(f"Fehler beim Einlesen der CSV-Datei: {exc}")
_validate_log_dataframe(df)
# Berechnungen der Metriken
latencies = df["expires_at_dist_hours"].astype(float)
start_offsets = df["worker_start_offset"].astype(float)
max_outlier = float(latencies.max())
band_center = float(latencies.mean())
try:
band_width = float(statistics.stdev(latencies))
except statistics.StatisticsError:
band_width = 0.0
result = AnalysisResults(
max_outlier=max_outlier,
band_center=band_center,
band_width=band_width,
)
# Eingebaute Validierungen (CI-Ready)
assert result.max_outlier >= 0, "max_outlier muss nicht-negativ sein"
assert result.band_width >= 0, "band_width muss nicht-negativ sein"
logger.debug("Analyse abgeschlossen: %s", result)
return result