Add unknowns_classifier/src/unknowns_classifier/core.py

This commit is contained in:
Mika 2026-02-03 17:11:46 +00:00
parent 960f585cf9
commit 0bf82cff06

View file

@ -0,0 +1,68 @@
from __future__ import annotations
import logging
from collections import Counter
from dataclasses import dataclass
from typing import List, Dict, Any
__all__ = ["UnknownClassification", "classify_unknowns"]
@dataclass
class UnknownClassification:
"""Represents a classified Unknown type and its occurrence count."""
unknown_type: str
count: int
def __post_init__(self) -> None:
if not isinstance(self.unknown_type, str):
raise TypeError("unknown_type must be a string")
if not isinstance(self.count, int):
raise TypeError("count must be an integer")
if self.count < 0:
raise ValueError("count must be non-negative")
def _infer_unknown_type(case: Dict[str, Any]) -> str:
"""Infer the unknown_type from an unknown case dict using simple heuristics."""
msg = str(case).lower()
if "schema" in msg or "field" in msg:
return "schema_mismatch"
if "missing" in msg or "not found" in msg or "fehlt" in msg:
return "artefakt_fehlt"
if "timeout" in msg or "connection" in msg:
return "netzwerk_problem"
if "valueerror" in msg or "parsing" in msg:
return "datenfehler"
return "unclassified"
def classify_unknowns(unknown_cases: List[Dict[str, Any]]) -> List[UnknownClassification]:
"""Classify a list of unknown cases into categorized UnknownClassification results.
Args:
unknown_cases: List of Unknown-Case dictionaries.
Returns:
List of UnknownClassification instances summarizing counts per type.
"""
if not isinstance(unknown_cases, list):
raise TypeError("unknown_cases must be a list of dicts")
logging.debug("Starting classification of %d unknown cases", len(unknown_cases))
inferred_types = []
for idx, case in enumerate(unknown_cases):
if not isinstance(case, dict):
raise TypeError(f"Entry at index {idx} is not a dict")
inferred = _infer_unknown_type(case)
logging.debug("Case %d inferred as: %s", idx, inferred)
inferred_types.append(inferred)
counts = Counter(inferred_types)
results = [UnknownClassification(unknown_type=t, count=c) for t, c in counts.items()]
logging.info("Classified %d unique unknown types", len(results))
return results