Add results_analysis/src/results_analysis/core.py
This commit is contained in:
parent
54c7606e6d
commit
c057dcb25f
1 changed files with 76 additions and 0 deletions
76
results_analysis/src/results_analysis/core.py
Normal file
76
results_analysis/src/results_analysis/core.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
from __future__ import annotations
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import List, Dict
|
||||
import pandas as pd
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
class ResultsAnalysisError(Exception):
|
||||
"""Custom exception for results analysis errors."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfusionMatrix:
|
||||
"""Repräsentation einer Confusion-Matrix zum Vergleich zweier Policy-Versionen."""
|
||||
TP: int
|
||||
TN: int
|
||||
FP: int
|
||||
FN: int
|
||||
|
||||
def to_json(self) -> Dict[str, int]:
|
||||
"""Serialisiert die Matrix in ein JSON-kompatibles Format."""
|
||||
return asdict(self)
|
||||
|
||||
|
||||
def _validate_results(results: List[Dict]) -> None:
|
||||
if not isinstance(results, list):
|
||||
raise ResultsAnalysisError("Results must be a list of dicts.")
|
||||
required_keys = {"run_id", "decision"}
|
||||
for item in results:
|
||||
if not isinstance(item, dict):
|
||||
raise ResultsAnalysisError("Each result entry must be a dictionary.")
|
||||
if not required_keys.issubset(item.keys()):
|
||||
raise ResultsAnalysisError(f"Missing required keys in result: {item}")
|
||||
|
||||
|
||||
def create_confusion_matrix(previous_results: List[Dict], new_results: List[Dict]) -> Dict[str, int]:
|
||||
"""Erzeugt eine Confusion-Matrix durch Vergleich von früheren und neuen Policy-Ergebnissen."""
|
||||
_validate_results(previous_results)
|
||||
_validate_results(new_results)
|
||||
|
||||
prev_df = pd.DataFrame(previous_results)
|
||||
new_df = pd.DataFrame(new_results)
|
||||
|
||||
if prev_df.empty or new_df.empty:
|
||||
raise ResultsAnalysisError("Input results must not be empty.")
|
||||
|
||||
merged = pd.merge(prev_df, new_df, on="run_id", suffixes=("_prev", "_new"))
|
||||
|
||||
TP = int(((merged.decision_prev == "PASS") & (merged.decision_new == "PASS")).sum())
|
||||
TN = int(((merged.decision_prev != "PASS") & (merged.decision_new != "PASS")).sum())
|
||||
FP = int(((merged.decision_prev != "PASS") & (merged.decision_new == "PASS")).sum())
|
||||
FN = int(((merged.decision_prev == "PASS") & (merged.decision_new != "PASS")).sum())
|
||||
|
||||
cm = ConfusionMatrix(TP=TP, TN=TN, FP=FP, FN=FN)
|
||||
logger.info("Confusion matrix created: %s", cm)
|
||||
return cm.to_json()
|
||||
|
||||
|
||||
def get_deltas(previous_results: List[Dict], new_results: List[Dict]) -> List[Dict]:
|
||||
"""Ermittelt alle Fälle, in denen sich das Ergebnisverhalten zwischen Versionen geändert hat."""
|
||||
_validate_results(previous_results)
|
||||
_validate_results(new_results)
|
||||
|
||||
prev_df = pd.DataFrame(previous_results)
|
||||
new_df = pd.DataFrame(new_results)
|
||||
|
||||
merged = pd.merge(prev_df, new_df, on="run_id", suffixes=("_prev", "_new"))
|
||||
changed = merged[merged.decision_prev != merged.decision_new]
|
||||
|
||||
deltas = changed[["run_id", "decision_prev", "decision_new"]].to_dict(orient="records")
|
||||
logger.info("Detected %d changed results.", len(deltas))
|
||||
return deltas
|
||||
Loading…
Reference in a new issue