diff --git a/report_generation/src/report_generation/main.py b/report_generation/src/report_generation/main.py new file mode 100644 index 0000000..462de64 --- /dev/null +++ b/report_generation/src/report_generation/main.py @@ -0,0 +1,86 @@ +import json +import os +from pathlib import Path +from datetime import datetime +from typing import Dict, Any + + +class ReportPath: + """Datamodel for the report output path.""" + + def __init__(self, file_location: str) -> None: + if not isinstance(file_location, str): + raise TypeError("file_location must be a string") + self.file_location = file_location + + def __repr__(self) -> str: + return f"ReportPath(file_location={self.file_location!r})" + + +def _validate_analysis_results(analysis_results: Dict[str, Any]) -> None: + """Validate the analysis_results input structure.""" + if not isinstance(analysis_results, dict): + raise TypeError("analysis_results must be a dictionary") + + required_fields = {"run_id", "metric_name", "value", "cluster_score"} + if "results" not in analysis_results or not isinstance(analysis_results["results"], list): + raise ValueError("analysis_results must contain a 'results' list") + + for entry in analysis_results["results"]: + if not isinstance(entry, dict): + raise ValueError("Each result entry must be a dictionary") + missing = required_fields - entry.keys() + if missing: + raise ValueError(f"Missing required fields in result entry: {missing}") + if not isinstance(entry["run_id"], int): + raise TypeError("run_id must be int") + if not isinstance(entry["metric_name"], str): + raise TypeError("metric_name must be str") + if not isinstance(entry["value"], (int, float)): + raise TypeError("value must be numeric") + if not isinstance(entry["cluster_score"], (int, float)): + raise TypeError("cluster_score must be numeric") + + +def generate_report(analysis_results: Dict[str, Any]) -> str: + """Erzeugt einen Bericht basierend auf den analysierten Resultaten der Replikationsläufe. + + Args: + analysis_results (dict): Aggregierte Ergebnisse der Replikations- und Clusteranalyse. + + Returns: + str: Pfad zur generierten JSON-Berichtsdatei. + """ + _validate_analysis_results(analysis_results) + + results = analysis_results["results"] + + # Compute summary statistics + total_runs = len(results) + avg_cluster = sum(r["cluster_score"] for r in results) / total_runs if total_runs else 0.0 + avg_metric_value = sum(r["value"] for r in results) / total_runs if total_runs else 0.0 + + metric_names = list({r["metric_name"] for r in results}) + run_ids = [r["run_id"] for r in results] + + report_data = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "total_runs": total_runs, + "metric_names": metric_names, + "average_cluster_score": avg_cluster, + "average_metric_value": avg_metric_value, + "run_ids": run_ids, + } + + output_dir = Path("output") + output_dir.mkdir(parents=True, exist_ok=True) + + output_path = output_dir / "report_summary.json" + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(report_data, f, indent=2) + + assert output_path.is_file(), "Report file was not created." + + report_path = ReportPath(file_location=str(output_path.resolve())) + return report_path.file_location