diff --git a/unknowns_classifier/src/unknowns_classifier/core.py b/unknowns_classifier/src/unknowns_classifier/core.py new file mode 100644 index 0000000..f1b11a5 --- /dev/null +++ b/unknowns_classifier/src/unknowns_classifier/core.py @@ -0,0 +1,68 @@ +from __future__ import annotations +import logging +from collections import Counter +from dataclasses import dataclass +from typing import List, Dict, Any + +__all__ = ["UnknownClassification", "classify_unknowns"] + + +@dataclass +class UnknownClassification: + """Represents a classified Unknown type and its occurrence count.""" + + unknown_type: str + count: int + + def __post_init__(self) -> None: + if not isinstance(self.unknown_type, str): + raise TypeError("unknown_type must be a string") + if not isinstance(self.count, int): + raise TypeError("count must be an integer") + if self.count < 0: + raise ValueError("count must be non-negative") + + +def _infer_unknown_type(case: Dict[str, Any]) -> str: + """Infer the unknown_type from an unknown case dict using simple heuristics.""" + msg = str(case).lower() + + if "schema" in msg or "field" in msg: + return "schema_mismatch" + if "missing" in msg or "not found" in msg or "fehlt" in msg: + return "artefakt_fehlt" + if "timeout" in msg or "connection" in msg: + return "netzwerk_problem" + if "valueerror" in msg or "parsing" in msg: + return "datenfehler" + return "unclassified" + + +def classify_unknowns(unknown_cases: List[Dict[str, Any]]) -> List[UnknownClassification]: + """Classify a list of unknown cases into categorized UnknownClassification results. + + Args: + unknown_cases: List of Unknown-Case dictionaries. + + Returns: + List of UnknownClassification instances summarizing counts per type. + """ + + if not isinstance(unknown_cases, list): + raise TypeError("unknown_cases must be a list of dicts") + + logging.debug("Starting classification of %d unknown cases", len(unknown_cases)) + + inferred_types = [] + for idx, case in enumerate(unknown_cases): + if not isinstance(case, dict): + raise TypeError(f"Entry at index {idx} is not a dict") + inferred = _infer_unknown_type(case) + logging.debug("Case %d inferred as: %s", idx, inferred) + inferred_types.append(inferred) + + counts = Counter(inferred_types) + results = [UnknownClassification(unknown_type=t, count=c) for t, c in counts.items()] + + logging.info("Classified %d unique unknown types", len(results)) + return results