commit ab46cf833692b001f68d3667751cce93b212fed7 Author: Mika Date: Sun Feb 22 12:32:32 2026 +0000 Add artifact.logging_module/src/artifact_logging_module/core.py diff --git a/artifact.logging_module/src/artifact_logging_module/core.py b/artifact.logging_module/src/artifact_logging_module/core.py new file mode 100644 index 0000000..c64c64e --- /dev/null +++ b/artifact.logging_module/src/artifact_logging_module/core.py @@ -0,0 +1,112 @@ +import json +import os +from datetime import datetime +from pathlib import Path +from typing import Optional, List, Dict, Any + +__all__ = ["LogEntry", "log_decision", "generate_report"] + + +class LogEntry: + """Repräsentiert einen einzelnen Logeintrag einer CI-Entscheidung.""" + + def __init__( + self, + timestamp: datetime, + decision_type: str, + reason_code: str, + additional_info: Optional[str] = None, + ) -> None: + self.timestamp = timestamp + self.decision_type = decision_type + self.reason_code = reason_code + self.additional_info = additional_info or "" + + def to_dict(self) -> Dict[str, Any]: + return { + "timestamp": self.timestamp.isoformat(), + "decision_type": self.decision_type, + "reason_code": self.reason_code, + "additional_info": self.additional_info, + } + + +_LOG_PATH = Path("data/ci_decisions.json") + + +class _LogError(Exception): + pass + + +def _validate_decision(decision_type: str, reason_code: str) -> None: + valid_types = {"PASS", "WARN", "UNKNOWN"} + assert isinstance(decision_type, str) and decision_type in valid_types, ( + f"Ungültiger decision_type: {decision_type}. Erwartet: {valid_types}." + ) + assert isinstance(reason_code, str) and reason_code.strip(), ( + "reason_code muss ein nicht-leerer String sein." + ) + + +def _load_existing_logs(path: Path) -> List[Dict[str, Any]]: + if not path.exists(): + return [] + try: + with path.open("r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError) as e: + raise _LogError(f"Fehler beim Laden der Logdatei: {e}") from e + + +def _write_logs(path: Path, logs: List[Dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + try: + with path.open("w", encoding="utf-8") as f: + json.dump(logs, f, ensure_ascii=False, indent=2) + except OSError as e: + raise _LogError(f"Fehler beim Schreiben der Logdatei: {e}") from e + + +def log_decision(decision_type: str, reason_code: str) -> None: + """Protokolliert eine CI-Entscheidung in data/ci_decisions.json.""" + + _validate_decision(decision_type, reason_code) + + entry = LogEntry( + timestamp=datetime.utcnow(), + decision_type=decision_type, + reason_code=reason_code, + ) + + logs = _load_existing_logs(_LOG_PATH) + logs.append(entry.to_dict()) + _write_logs(_LOG_PATH, logs) + + +def generate_report() -> str: + """Erzeugt eine textuelle Zusammenfassung der Entscheidungsstatistik.""" + + logs = _load_existing_logs(_LOG_PATH) + if not logs: + return "Keine Logeinträge gefunden." + + stats = {"PASS": 0, "WARN": 0, "UNKNOWN": 0} + for e in logs: + dt = e.get("decision_type") + if dt in stats: + stats[dt] += 1 + else: + stats["UNKNOWN"] += 1 + + total = sum(stats.values()) or 1 + warn_rate = (stats["WARN"] / total) * 100 + + report_lines = [ + "CI Decisions Summary Report", + "============================", + f"Total Entries: {total}", + f"PASS: {stats['PASS']}", + f"WARN: {stats['WARN']} ({warn_rate:.2f}%)", + f"UNKNOWN:{stats['UNKNOWN']}", + ] + return "\n".join(report_lines)