diff --git a/results_analysis/src/results_analysis/core.py b/results_analysis/src/results_analysis/core.py new file mode 100644 index 0000000..42e439b --- /dev/null +++ b/results_analysis/src/results_analysis/core.py @@ -0,0 +1,76 @@ +from __future__ import annotations +from dataclasses import dataclass, asdict +from typing import List, Dict +import pandas as pd +import logging + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class ResultsAnalysisError(Exception): + """Custom exception for results analysis errors.""" + + +@dataclass +class ConfusionMatrix: + """Repräsentation einer Confusion-Matrix zum Vergleich zweier Policy-Versionen.""" + TP: int + TN: int + FP: int + FN: int + + def to_json(self) -> Dict[str, int]: + """Serialisiert die Matrix in ein JSON-kompatibles Format.""" + return asdict(self) + + +def _validate_results(results: List[Dict]) -> None: + if not isinstance(results, list): + raise ResultsAnalysisError("Results must be a list of dicts.") + required_keys = {"run_id", "decision"} + for item in results: + if not isinstance(item, dict): + raise ResultsAnalysisError("Each result entry must be a dictionary.") + if not required_keys.issubset(item.keys()): + raise ResultsAnalysisError(f"Missing required keys in result: {item}") + + +def create_confusion_matrix(previous_results: List[Dict], new_results: List[Dict]) -> Dict[str, int]: + """Erzeugt eine Confusion-Matrix durch Vergleich von früheren und neuen Policy-Ergebnissen.""" + _validate_results(previous_results) + _validate_results(new_results) + + prev_df = pd.DataFrame(previous_results) + new_df = pd.DataFrame(new_results) + + if prev_df.empty or new_df.empty: + raise ResultsAnalysisError("Input results must not be empty.") + + merged = pd.merge(prev_df, new_df, on="run_id", suffixes=("_prev", "_new")) + + TP = int(((merged.decision_prev == "PASS") & (merged.decision_new == "PASS")).sum()) + TN = int(((merged.decision_prev != "PASS") & (merged.decision_new != "PASS")).sum()) + FP = int(((merged.decision_prev != "PASS") & (merged.decision_new == "PASS")).sum()) + FN = int(((merged.decision_prev == "PASS") & (merged.decision_new != "PASS")).sum()) + + cm = ConfusionMatrix(TP=TP, TN=TN, FP=FP, FN=FN) + logger.info("Confusion matrix created: %s", cm) + return cm.to_json() + + +def get_deltas(previous_results: List[Dict], new_results: List[Dict]) -> List[Dict]: + """Ermittelt alle Fälle, in denen sich das Ergebnisverhalten zwischen Versionen geändert hat.""" + _validate_results(previous_results) + _validate_results(new_results) + + prev_df = pd.DataFrame(previous_results) + new_df = pd.DataFrame(new_results) + + merged = pd.merge(prev_df, new_df, on="run_id", suffixes=("_prev", "_new")) + changed = merged[merged.decision_prev != merged.decision_new] + + deltas = changed[["run_id", "decision_prev", "decision_new"]].to_dict(orient="records") + logger.info("Detected %d changed results.", len(deltas)) + return deltas