Add audit_analysis/src/audit_analysis/core.py

This commit is contained in:
Mika 2026-02-04 14:46:44 +00:00
commit 31873ea2ff

View file

@ -0,0 +1,94 @@
from __future__ import annotations
import logging
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List
import pandas as pd
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s')
@dataclass
class ClassificationReportEntry:
"""Repräsentiert eine Klassifikationszusammenfassung für Unknown-Audit-Einträge."""
class_: str # 'class' ist reserviertes Wort in Python
percentage: float
action: str
def to_dict(self) -> Dict[str, object]:
return {"class": self.class_, "percentage": self.percentage, "action": self.action}
class AuditFileError(Exception):
"""Custom Exception für Datei-Lese- oder Datenvalidierungsfehler."""
pass
def _validate_input(file_path: str) -> Path:
path = Path(file_path)
if not path.exists() or not path.is_file():
raise AuditFileError(f"Audit-Datei nicht gefunden: {file_path}")
if path.suffix.lower() != '.csv':
raise AuditFileError(f"Ungültiges Format: {file_path} ist keine CSV-Datei")
return path
def _classify_unknowns(df: pd.DataFrame) -> List[ClassificationReportEntry]:
unknowns = df[df['status'].str.lower() == 'unknown']
if unknowns.empty:
logger.warning("Keine Unknown-Einträge gefunden.")
return []
cause_col = 'cause' if 'cause' in unknowns.columns else 'error'
grouped = unknowns.groupby(cause_col).size().reset_index(name='count')
total = grouped['count'].sum()
mapping = {
'artefact_missing': 'WARN',
'contract_error': 'FAIL',
'io_failure': 'RETRY',
'timeout': 'RETRY'
}
report_entries = []
for _, row in grouped.iterrows():
cls_name = str(row[cause_col])
pct = float(row['count']) / total * 100 if total > 0 else 0.0
action = mapping.get(cls_name, 'INVESTIGATE')
report_entries.append(ClassificationReportEntry(class_=cls_name, percentage=round(pct, 2), action=action))
return report_entries
def analyze_audit(file_path: str) -> Dict[str, object]:
"""Analysiert eine Audit-CSV-Datei und liefert eine strukturierte Klassifikationsauswertung.
Args:
file_path: Pfad zur Audit-CSV-Datei mit CI-Rundaten.
Returns:
dict: JSON-ähnliche Struktur mit Klassifizierungsübersicht und empfohlenen Aktionen.
"""
assert isinstance(file_path, str), "file_path muss ein String sein"
path = _validate_input(file_path)
logger.info(f"Lese Audit-Datei: {path}")
try:
df = pd.read_csv(path)
except Exception as e:
raise AuditFileError(f"Fehler beim Lesen der CSV-Datei: {e}") from e
required_columns = {'status'}
if not required_columns.issubset(df.columns):
raise AuditFileError(f"Fehlende Pflichtspalten: {required_columns - set(df.columns)}")
report_entries = _classify_unknowns(df)
classification_report = {
"summary": [entry.to_dict() for entry in report_entries],
"total_unknowns": int(len(df[df['status'].str.lower() == 'unknown']))
}
logger.info(f"Klassifikationsbericht erstellt mit {len(report_entries)} Klassen.")
return classification_report