Add artifact.logging_module/src/artifact_logging_module/core.py

This commit is contained in:
Mika 2026-02-22 12:32:32 +00:00
commit ab46cf8336

View file

@ -0,0 +1,112 @@
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
__all__ = ["LogEntry", "log_decision", "generate_report"]
class LogEntry:
"""Repräsentiert einen einzelnen Logeintrag einer CI-Entscheidung."""
def __init__(
self,
timestamp: datetime,
decision_type: str,
reason_code: str,
additional_info: Optional[str] = None,
) -> None:
self.timestamp = timestamp
self.decision_type = decision_type
self.reason_code = reason_code
self.additional_info = additional_info or ""
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp.isoformat(),
"decision_type": self.decision_type,
"reason_code": self.reason_code,
"additional_info": self.additional_info,
}
_LOG_PATH = Path("data/ci_decisions.json")
class _LogError(Exception):
pass
def _validate_decision(decision_type: str, reason_code: str) -> None:
valid_types = {"PASS", "WARN", "UNKNOWN"}
assert isinstance(decision_type, str) and decision_type in valid_types, (
f"Ungültiger decision_type: {decision_type}. Erwartet: {valid_types}."
)
assert isinstance(reason_code, str) and reason_code.strip(), (
"reason_code muss ein nicht-leerer String sein."
)
def _load_existing_logs(path: Path) -> List[Dict[str, Any]]:
if not path.exists():
return []
try:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
raise _LogError(f"Fehler beim Laden der Logdatei: {e}") from e
def _write_logs(path: Path, logs: List[Dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
try:
with path.open("w", encoding="utf-8") as f:
json.dump(logs, f, ensure_ascii=False, indent=2)
except OSError as e:
raise _LogError(f"Fehler beim Schreiben der Logdatei: {e}") from e
def log_decision(decision_type: str, reason_code: str) -> None:
"""Protokolliert eine CI-Entscheidung in data/ci_decisions.json."""
_validate_decision(decision_type, reason_code)
entry = LogEntry(
timestamp=datetime.utcnow(),
decision_type=decision_type,
reason_code=reason_code,
)
logs = _load_existing_logs(_LOG_PATH)
logs.append(entry.to_dict())
_write_logs(_LOG_PATH, logs)
def generate_report() -> str:
"""Erzeugt eine textuelle Zusammenfassung der Entscheidungsstatistik."""
logs = _load_existing_logs(_LOG_PATH)
if not logs:
return "Keine Logeinträge gefunden."
stats = {"PASS": 0, "WARN": 0, "UNKNOWN": 0}
for e in logs:
dt = e.get("decision_type")
if dt in stats:
stats[dt] += 1
else:
stats["UNKNOWN"] += 1
total = sum(stats.values()) or 1
warn_rate = (stats["WARN"] / total) * 100
report_lines = [
"CI Decisions Summary Report",
"============================",
f"Total Entries: {total}",
f"PASS: {stats['PASS']}",
f"WARN: {stats['WARN']} ({warn_rate:.2f}%)",
f"UNKNOWN:{stats['UNKNOWN']}",
]
return "\n".join(report_lines)