From 09eb81c0554413d6699cd37812923af036315269 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 15 Feb 2026 11:41:28 +0000 Subject: [PATCH] Add generate_report/src/generate_report/core.py --- generate_report/src/generate_report/core.py | 78 +++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 generate_report/src/generate_report/core.py diff --git a/generate_report/src/generate_report/core.py b/generate_report/src/generate_report/core.py new file mode 100644 index 0000000..a6b6a7f --- /dev/null +++ b/generate_report/src/generate_report/core.py @@ -0,0 +1,78 @@ +import json +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import Any, Dict, List + + +@dataclass +class LatencyResult: + batch_id: str + metric: str + value_ms: float + timestamp: datetime + + +@dataclass +class AnalysisSummary: + batch_count: int + avg_latency: float + max_latency: float + drift_detected: bool + + +@dataclass +class ReportDocument: + title: str + created_at: datetime + summary_text: str + metrics: List[Dict[str, Any]] + + +def _validate_latency_results(latency_results: List[Dict[str, Any]]) -> None: + if not isinstance(latency_results, list): + raise ValueError("latency_results must be a list of dict objects") + required_fields = {"batch_id", "metric", "value_ms", "timestamp"} + for entry in latency_results: + if not isinstance(entry, dict): + raise ValueError("Each latency result must be a dict") + missing = required_fields - set(entry.keys()) + if missing: + raise ValueError(f"LatencyResult missing fields: {missing}") + + +def _validate_analysis_summary(summary: Dict[str, Any]) -> None: + if not isinstance(summary, dict): + raise ValueError("analysis_summary must be a dict") + required_fields = {"batch_count", "avg_latency", "max_latency", "drift_detected"} + missing = required_fields - set(summary.keys()) + if missing: + raise ValueError(f"AnalysisSummary missing fields: {missing}") + + +def create_report(latency_results: List[Dict[str, Any]], analysis_summary: Dict[str, Any]) -> Dict[str, Any]: + """Erstellt einen Bericht basierend auf den gegebenen Latenzresultaten und der Analysezusammenfassung.""" + _validate_latency_results(latency_results) + _validate_analysis_summary(analysis_summary) + + drift_text = "Drift wurde festgestellt." if analysis_summary.get("drift_detected") else "Keine Drift festgestellt." + + summary_text = ( + f"Batch Count: {analysis_summary['batch_count']}, " + f"Average Latency: {analysis_summary['avg_latency']} ms, " + f"Max Latency: {analysis_summary['max_latency']} ms, " + f"{drift_text}" + ) + + report = ReportDocument( + title="Batch Latency Measurement Report", + created_at=datetime.utcnow(), + summary_text=summary_text, + metrics=latency_results, + ) + + return { + "title": report.title, + "created_at": report.created_at.isoformat(), + "summary_text": report.summary_text, + "metrics": report.metrics, + }