From d1e357fa3c7f18e4ad1bea0e54c5b0382b4f0ff7 Mon Sep 17 00:00:00 2001 From: Mika Date: Thu, 5 Mar 2026 15:48:01 +0000 Subject: [PATCH] Add data_analysis/src/data_analysis/core.py --- data_analysis/src/data_analysis/core.py | 71 +++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 data_analysis/src/data_analysis/core.py diff --git a/data_analysis/src/data_analysis/core.py b/data_analysis/src/data_analysis/core.py new file mode 100644 index 0000000..3168693 --- /dev/null +++ b/data_analysis/src/data_analysis/core.py @@ -0,0 +1,71 @@ +from __future__ import annotations +import logging +from typing import List, Dict, Any +import pandas as pd + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class DataValidationError(Exception): + """Raised when input data validation fails.""" + pass + + +def _validate_input_data(data: List[Dict[str, Any]]) -> None: + """Validiert, ob die Eingabedaten die erforderlichen Felder enthalten.""" + required_fields = {"context", "pinned_status", "delta_t", "warn"} + if not isinstance(data, list): + raise DataValidationError("Input data must be a list of dicts.") + for i, record in enumerate(data): + if not isinstance(record, dict): + raise DataValidationError(f"Each entry must be a dict (index {i}).") + if not required_fields.issubset(record.keys()): + missing = required_fields - record.keys() + raise DataValidationError(f"Missing fields {missing} in record at index {i}.") + + +def analyze_data(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analysiert Datensätze und identifiziert Δt<0-Fälle sowie berechnet Aggregationen. + + Parameters + ---------- + data : list[dict] + Liste von Datensätzen mit Feldern: context, pinned_status, delta_t, warn. + + Returns + ------- + dict + Analyseergebnis: near_expiry_count, fresh_count, warn_rate. + """ + logger.debug("Starting analyze_data with %d records", len(data)) + + _validate_input_data(data) + + df = pd.DataFrame(data) + required_columns = ["context", "pinned_status", "delta_t", "warn"] + if not all(col in df.columns for col in required_columns): + raise DataValidationError("Input data missing required columns.") + + # Filter nur auf unpinned und negative delta_t-Fälle + unpinned_df = df[(df["pinned_status"] == "unpinned") & (df["delta_t"] < 0)] + logger.debug("Filtered unpinned negative delta_t count: %d", len(unpinned_df)) + + near_expiry_count = int((unpinned_df["context"] == "near_expiry").sum()) + fresh_count = int((unpinned_df["context"] == "fresh").sum()) + + warn_total = int(unpinned_df["warn"].sum()) + warn_rate = float(warn_total / len(unpinned_df)) if len(unpinned_df) > 0 else 0.0 + + result = { + "near_expiry_count": near_expiry_count, + "fresh_count": fresh_count, + "warn_rate": round(warn_rate, 4), + } + + logger.info("Analysis complete: %s", result) + assert all(key in result for key in ["near_expiry_count", "fresh_count", "warn_rate"]), ( + "Result missing expected keys!" + ) + return result