From 31873ea2ff28b76c18fc6ee9ca5afd189f25e1db Mon Sep 17 00:00:00 2001 From: Mika Date: Wed, 4 Feb 2026 14:46:44 +0000 Subject: [PATCH] Add audit_analysis/src/audit_analysis/core.py --- audit_analysis/src/audit_analysis/core.py | 94 +++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 audit_analysis/src/audit_analysis/core.py diff --git a/audit_analysis/src/audit_analysis/core.py b/audit_analysis/src/audit_analysis/core.py new file mode 100644 index 0000000..02f7513 --- /dev/null +++ b/audit_analysis/src/audit_analysis/core.py @@ -0,0 +1,94 @@ +from __future__ import annotations +import logging +from pathlib import Path +from dataclasses import dataclass +from typing import Dict, List +import pandas as pd + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s') + + +@dataclass +class ClassificationReportEntry: + """Repräsentiert eine Klassifikationszusammenfassung für Unknown-Audit-Einträge.""" + class_: str # 'class' ist reserviertes Wort in Python + percentage: float + action: str + + def to_dict(self) -> Dict[str, object]: + return {"class": self.class_, "percentage": self.percentage, "action": self.action} + + +class AuditFileError(Exception): + """Custom Exception für Datei-Lese- oder Datenvalidierungsfehler.""" + pass + + +def _validate_input(file_path: str) -> Path: + path = Path(file_path) + if not path.exists() or not path.is_file(): + raise AuditFileError(f"Audit-Datei nicht gefunden: {file_path}") + if path.suffix.lower() != '.csv': + raise AuditFileError(f"Ungültiges Format: {file_path} ist keine CSV-Datei") + return path + + +def _classify_unknowns(df: pd.DataFrame) -> List[ClassificationReportEntry]: + unknowns = df[df['status'].str.lower() == 'unknown'] + if unknowns.empty: + logger.warning("Keine Unknown-Einträge gefunden.") + return [] + + cause_col = 'cause' if 'cause' in unknowns.columns else 'error' + grouped = unknowns.groupby(cause_col).size().reset_index(name='count') + total = grouped['count'].sum() + + mapping = { + 'artefact_missing': 'WARN', + 'contract_error': 'FAIL', + 'io_failure': 'RETRY', + 'timeout': 'RETRY' + } + + report_entries = [] + for _, row in grouped.iterrows(): + cls_name = str(row[cause_col]) + pct = float(row['count']) / total * 100 if total > 0 else 0.0 + action = mapping.get(cls_name, 'INVESTIGATE') + report_entries.append(ClassificationReportEntry(class_=cls_name, percentage=round(pct, 2), action=action)) + + return report_entries + + +def analyze_audit(file_path: str) -> Dict[str, object]: + """Analysiert eine Audit-CSV-Datei und liefert eine strukturierte Klassifikationsauswertung. + + Args: + file_path: Pfad zur Audit-CSV-Datei mit CI-Rundaten. + + Returns: + dict: JSON-ähnliche Struktur mit Klassifizierungsübersicht und empfohlenen Aktionen. + """ + assert isinstance(file_path, str), "file_path muss ein String sein" + + path = _validate_input(file_path) + logger.info(f"Lese Audit-Datei: {path}") + try: + df = pd.read_csv(path) + except Exception as e: + raise AuditFileError(f"Fehler beim Lesen der CSV-Datei: {e}") from e + + required_columns = {'status'} + if not required_columns.issubset(df.columns): + raise AuditFileError(f"Fehlende Pflichtspalten: {required_columns - set(df.columns)}") + + report_entries = _classify_unknowns(df) + + classification_report = { + "summary": [entry.to_dict() for entry in report_entries], + "total_unknowns": int(len(df[df['status'].str.lower() == 'unknown'])) + } + + logger.info(f"Klassifikationsbericht erstellt mit {len(report_entries)} Klassen.") + return classification_report \ No newline at end of file