Add unknown_analysis/src/unknown_analysis/core.py

This commit is contained in:
Mika 2026-02-12 11:16:20 +00:00
commit 5dab9594dd

View file

@ -0,0 +1,84 @@
from __future__ import annotations
import pandas as pd
from typing import List, Dict
class InputValidationError(ValueError):
"""Exception raised when input log_data is invalid."""
pass
def _validate_log_data(log_data: list[dict]) -> None:
if not isinstance(log_data, list):
raise InputValidationError("log_data must be a list of dictionaries")
required_fields = {"artifact_key", "status", "cause", "path", "error"}
for i, entry in enumerate(log_data):
if not isinstance(entry, dict):
raise InputValidationError(f"Entry {i} is not a dictionary")
missing = required_fields - set(entry.keys())
if missing:
raise InputValidationError(f"Entry {i} missing required keys: {missing}")
def calculate_unknown_rates(log_data: List[Dict]) -> Dict[str, float]:
"""Berechnet die Unknown-Artifakt- und Unknown-Schema-Quoten aus Logdaten.
Args:
log_data: Liste von Dictionaries mit Artefaktinformationen.
Returns:
Dictionary mit den Raten der Unknown-Kategorien.
"""
_validate_log_data(log_data)
if not log_data:
return {"unknown_artifact_missing_rate": 0.0, "unknown_schema_rate": 0.0}
df = pd.DataFrame(log_data)
total_count = max(len(df), 1)
unknown_mask = df["status"].str.upper() == "UNKNOWN"
unknown_df = df[unknown_mask]
missing_rate = (unknown_df["cause"].str.contains("missing", case=False, na=False).sum()) / total_count
schema_rate = (unknown_df["cause"].str.contains("schema", case=False, na=False).sum()) / total_count
return {
"unknown_artifact_missing_rate": round(missing_rate, 4),
"unknown_schema_rate": round(schema_rate, 4),
}
def get_top_pass_unknown_switches(log_data: List[Dict]) -> List[Dict]:
"""Analysiert die häufigsten PASS→Unknown-Umschaltungen.
Args:
log_data: Liste der Logeinträge mit Statusänderungen und Fehlern.
Returns:
Liste der Top-Umschaltungen mit 'cause', 'path' und 'error'.
"""
_validate_log_data(log_data)
if not log_data:
return []
df = pd.DataFrame(log_data)
df = df.sort_values(by=["artifact_key"]) # Ensure grouping order is stable
pass_unknown = df.query('status.str.upper() == "UNKNOWN"', engine="python")
# For simplicity, treat any UNKNOWN with a previous PASS artifact as a switch
known_pass_artifacts = set(df.loc[df["status"].str.upper() == "PASS", "artifact_key"])
switches = pass_unknown[pass_unknown["artifact_key"].isin(known_pass_artifacts)]
if switches.empty:
return []
grouped = (
switches.groupby(["cause", "path", "error"])
.size()
.reset_index(name="count")
.sort_values(by="count", ascending=False)
)
top = grouped.head(10)[["cause", "path", "error"]]
return top.to_dict(orient="records")