Add report_generation/src/report_generation/main.py
This commit is contained in:
parent
f3282c1f79
commit
6c3be5ec4c
1 changed files with 86 additions and 0 deletions
86
report_generation/src/report_generation/main.py
Normal file
86
report_generation/src/report_generation/main.py
Normal file
|
|
@ -0,0 +1,86 @@
|
|||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any
|
||||
|
||||
|
||||
class ReportPath:
|
||||
"""Datamodel for the report output path."""
|
||||
|
||||
def __init__(self, file_location: str) -> None:
|
||||
if not isinstance(file_location, str):
|
||||
raise TypeError("file_location must be a string")
|
||||
self.file_location = file_location
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"ReportPath(file_location={self.file_location!r})"
|
||||
|
||||
|
||||
def _validate_analysis_results(analysis_results: Dict[str, Any]) -> None:
|
||||
"""Validate the analysis_results input structure."""
|
||||
if not isinstance(analysis_results, dict):
|
||||
raise TypeError("analysis_results must be a dictionary")
|
||||
|
||||
required_fields = {"run_id", "metric_name", "value", "cluster_score"}
|
||||
if "results" not in analysis_results or not isinstance(analysis_results["results"], list):
|
||||
raise ValueError("analysis_results must contain a 'results' list")
|
||||
|
||||
for entry in analysis_results["results"]:
|
||||
if not isinstance(entry, dict):
|
||||
raise ValueError("Each result entry must be a dictionary")
|
||||
missing = required_fields - entry.keys()
|
||||
if missing:
|
||||
raise ValueError(f"Missing required fields in result entry: {missing}")
|
||||
if not isinstance(entry["run_id"], int):
|
||||
raise TypeError("run_id must be int")
|
||||
if not isinstance(entry["metric_name"], str):
|
||||
raise TypeError("metric_name must be str")
|
||||
if not isinstance(entry["value"], (int, float)):
|
||||
raise TypeError("value must be numeric")
|
||||
if not isinstance(entry["cluster_score"], (int, float)):
|
||||
raise TypeError("cluster_score must be numeric")
|
||||
|
||||
|
||||
def generate_report(analysis_results: Dict[str, Any]) -> str:
|
||||
"""Erzeugt einen Bericht basierend auf den analysierten Resultaten der Replikationsläufe.
|
||||
|
||||
Args:
|
||||
analysis_results (dict): Aggregierte Ergebnisse der Replikations- und Clusteranalyse.
|
||||
|
||||
Returns:
|
||||
str: Pfad zur generierten JSON-Berichtsdatei.
|
||||
"""
|
||||
_validate_analysis_results(analysis_results)
|
||||
|
||||
results = analysis_results["results"]
|
||||
|
||||
# Compute summary statistics
|
||||
total_runs = len(results)
|
||||
avg_cluster = sum(r["cluster_score"] for r in results) / total_runs if total_runs else 0.0
|
||||
avg_metric_value = sum(r["value"] for r in results) / total_runs if total_runs else 0.0
|
||||
|
||||
metric_names = list({r["metric_name"] for r in results})
|
||||
run_ids = [r["run_id"] for r in results]
|
||||
|
||||
report_data = {
|
||||
"timestamp": datetime.utcnow().isoformat() + "Z",
|
||||
"total_runs": total_runs,
|
||||
"metric_names": metric_names,
|
||||
"average_cluster_score": avg_cluster,
|
||||
"average_metric_value": avg_metric_value,
|
||||
"run_ids": run_ids,
|
||||
}
|
||||
|
||||
output_dir = Path("output")
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
output_path = output_dir / "report_summary.json"
|
||||
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
json.dump(report_data, f, indent=2)
|
||||
|
||||
assert output_path.is_file(), "Report file was not created."
|
||||
|
||||
report_path = ReportPath(file_location=str(output_path.resolve()))
|
||||
return report_path.file_location
|
||||
Loading…
Reference in a new issue