Add run_analysis/src/run_analysis/core.py

This commit is contained in:
Mika 2026-03-01 17:26:32 +00:00
commit b62cff80a1

View file

@ -0,0 +1,78 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
import statistics
logger = logging.getLogger(__name__)
@dataclass
class RunData:
"""Repräsentiert ein einzelnes Laufdaten-Objekt."""
corr_id: str
expires_at_dist_hours: float
visibility_lag: float
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'RunData':
"""Validiert und erstellt eine RunData-Instanz aus einem Dictionary."""
required_fields = ("corr_id", "expires_at_dist_hours", "visibility_lag")
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field '{field}' in RunData input.")
if not isinstance(data["corr_id"], str):
raise TypeError("'corr_id' must be a string.")
try:
exp_val = float(data["expires_at_dist_hours"])
vis_val = float(data["visibility_lag"])
except (TypeError, ValueError) as e:
raise ValueError("Numeric conversion failed for RunData fields.") from e
return RunData(
corr_id=data["corr_id"],
expires_at_dist_hours=exp_val,
visibility_lag=vis_val
)
def analyze_run_data(run_data: List[RunData]) -> Dict[str, Any]:
"""Analysiert Run-Daten und extrahiert Fälle mit Δt (visibility_lag) < 0.
Args:
run_data: Liste von RunData-Objekten.
Returns:
Dict mit Übersicht der negativen Δt-Fälle, inklusive Statistiken.
"""
assert isinstance(run_data, list), "run_data must be a list of RunData instances"
if not run_data:
return {"total_records": 0, "negative_dt_count": 0, "neg_corr_ids": [], "summary": {}}
negative_cases = [r for r in run_data if r.visibility_lag < 0]
neg_corr_ids = [r.corr_id for r in negative_cases]
result: Dict[str, Any] = {
"total_records": len(run_data),
"negative_dt_count": len(negative_cases),
"neg_corr_ids": neg_corr_ids,
"summary": {}
}
if negative_cases:
exp_vals = [r.expires_at_dist_hours for r in negative_cases]
vis_vals = [r.visibility_lag for r in negative_cases]
try:
result["summary"] = {
"mean_expires_at_dist_hours": statistics.mean(exp_vals),
"mean_visibility_lag": statistics.mean(vis_vals),
"min_visibility_lag": min(vis_vals),
"max_visibility_lag": max(vis_vals)
}
except statistics.StatisticsError:
result["summary"] = {}
logger.debug("Analysis complete: %s", result)
return result