Add mini_reporting_block/src/mini_reporting_block/core.py

This commit is contained in:
Mika 2026-03-01 17:26:33 +00:00
parent 62fc7753b4
commit 71495d1b72

View file

@ -0,0 +1,59 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Any
class ReportingBlockValidationError(ValueError):
"""Fehler bei der Validierung eines ReportingBlock-Objekts."""
pass
@dataclass
class ReportingBlock:
"""Datencontainer für Δt<0-Fälle mit Zusammenfassung und Detailinformationen."""
summary: str
details: List[Dict[str, Any]] = field(default_factory=list)
def to_json(self) -> Dict[str, Any]:
"""Gibt das Reporting-Objekt als JSON-kompatibles dict zurück."""
return asdict(self)
_DEF_FIELDS = {"corr_id", "expires_at_dist_hours", "visibility_lag"}
def _validate_analysis_results(analysis_results: List[Dict[str, Any]]) -> None:
if not isinstance(analysis_results, list):
raise ReportingBlockValidationError("analysis_results muss eine Liste sein.")
for idx, entry in enumerate(analysis_results):
if not isinstance(entry, dict):
raise ReportingBlockValidationError(f"Eintrag {idx} ist kein Dict.")
if not _DEF_FIELDS.intersection(entry.keys()):
raise ReportingBlockValidationError(
f"Eintrag {idx} enthält keine erwarteten Felder: {sorted(_DEF_FIELDS)}"
)
def generate_reporting_block(analysis_results: List[Dict[str, Any]]) -> str:
"""Erstellt einen formatierten Reporting-Block aus Analyseergebnissen der Δt<0-Fälle."""
_validate_analysis_results(analysis_results)
count = len(analysis_results)
if count == 0:
summary = "Keine Δt<0-Fälle gefunden."
else:
avg_expire = None
expire_vals = [r.get("expires_at_dist_hours") for r in analysis_results if isinstance(r.get("expires_at_dist_hours"), (int, float))]
if expire_vals:
avg_expire = sum(expire_vals) / len(expire_vals)
summary_parts = [
f"Anzahl Δt<0-Fälle: {count}",
]
if avg_expire is not None:
summary_parts.append(f"Ø expires_at_dist_hours: {avg_expire:.2f}")
summary = "; ".join(summary_parts)
block = ReportingBlock(summary=summary, details=analysis_results)
return json.dumps(block.to_json(), ensure_ascii=False, indent=2)