Add unknown_case_counter/src/unknown_case_counter/core.py

This commit is contained in:
Mika 2026-02-11 12:51:44 +00:00
commit b373f2616e

View file

@ -0,0 +1,65 @@
import csv
import json
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Dict
@dataclass
class DeltaCase:
case_id: str
previous_status: str
new_status: str
unknown_reason: str | None = None
@dataclass
class UnknownReasonsSummary:
reason: str
count: int
def count_unknown_reasons(file_path: str) -> Dict[str, int]:
"""Liest delta_cases.csv ein und zählt alle Unknown-Fälle gruppiert nach unbekanntem Grund.
Args:
file_path: Pfad zur CSV-Datei mit den Delta-Cases.
Returns:
Dictionary mit Gründen ('reason') als Schlüssel und Anzahl der Unknown-Vorkommen als Wert.
"""
path = Path(file_path)
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {file_path}")
counts = Counter()
with path.open(newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
required_fields = {"case_id", "previous_status", "new_status", "unknown_reason"}
if not required_fields.issubset(reader.fieldnames or []):
missing = required_fields - set(reader.fieldnames or [])
raise ValueError(f"CSV-Datei fehlt notwendige Felder: {', '.join(sorted(missing))}")
for row in reader:
case = DeltaCase(
case_id=row.get("case_id", "").strip(),
previous_status=row.get("previous_status", "").strip(),
new_status=row.get("new_status", "").strip(),
unknown_reason=row.get("unknown_reason", "").strip() or None,
)
assert isinstance(case.case_id, str)
assert isinstance(case.previous_status, str)
assert isinstance(case.new_status, str)
if case.new_status.lower() == "unknown":
reason = case.unknown_reason or "Unspecified"
counts[reason] += 1
# Optional: serialize to JSON-like string for stdout reference
summary = {reason: count for reason, count in counts.items()}
print(json.dumps(summary, ensure_ascii=False))
return summary