Add data_analysis/src/data_analysis/core.py

This commit is contained in:
Mika 2026-03-05 15:48:01 +00:00
parent b2c3d2719d
commit d1e357fa3c

View file

@ -0,0 +1,71 @@
from __future__ import annotations
import logging
from typing import List, Dict, Any
import pandas as pd
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataValidationError(Exception):
"""Raised when input data validation fails."""
pass
def _validate_input_data(data: List[Dict[str, Any]]) -> None:
"""Validiert, ob die Eingabedaten die erforderlichen Felder enthalten."""
required_fields = {"context", "pinned_status", "delta_t", "warn"}
if not isinstance(data, list):
raise DataValidationError("Input data must be a list of dicts.")
for i, record in enumerate(data):
if not isinstance(record, dict):
raise DataValidationError(f"Each entry must be a dict (index {i}).")
if not required_fields.issubset(record.keys()):
missing = required_fields - record.keys()
raise DataValidationError(f"Missing fields {missing} in record at index {i}.")
def analyze_data(data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analysiert Datensätze und identifiziert Δt<0-Fälle sowie berechnet Aggregationen.
Parameters
----------
data : list[dict]
Liste von Datensätzen mit Feldern: context, pinned_status, delta_t, warn.
Returns
-------
dict
Analyseergebnis: near_expiry_count, fresh_count, warn_rate.
"""
logger.debug("Starting analyze_data with %d records", len(data))
_validate_input_data(data)
df = pd.DataFrame(data)
required_columns = ["context", "pinned_status", "delta_t", "warn"]
if not all(col in df.columns for col in required_columns):
raise DataValidationError("Input data missing required columns.")
# Filter nur auf unpinned und negative delta_t-Fälle
unpinned_df = df[(df["pinned_status"] == "unpinned") & (df["delta_t"] < 0)]
logger.debug("Filtered unpinned negative delta_t count: %d", len(unpinned_df))
near_expiry_count = int((unpinned_df["context"] == "near_expiry").sum())
fresh_count = int((unpinned_df["context"] == "fresh").sum())
warn_total = int(unpinned_df["warn"].sum())
warn_rate = float(warn_total / len(unpinned_df)) if len(unpinned_df) > 0 else 0.0
result = {
"near_expiry_count": near_expiry_count,
"fresh_count": fresh_count,
"warn_rate": round(warn_rate, 4),
}
logger.info("Analysis complete: %s", result)
assert all(key in result for key in ["near_expiry_count", "fresh_count", "warn_rate"]), (
"Result missing expected keys!"
)
return result