Add artifact.logging_module/src/artifact_logging_module/cli.py

This commit is contained in:
Mika 2026-02-22 12:32:33 +00:00
parent ab46cf8336
commit 3f4ebb66d5

View file

@ -0,0 +1,72 @@
import argparse
import json
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List
def load_json_file(path: Path) -> List[Dict[str, Any]]:
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {path}")
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("JSON-Daten müssen eine Liste von Log-Einträgen enthalten.")
return data
def validate_log_entry(entry: Dict[str, Any]) -> bool:
required_fields = {"timestamp", "decision_type", "reason_code", "additional_info"}
if not required_fields.issubset(entry.keys()):
return False
try:
datetime.fromisoformat(entry["timestamp"])
except Exception:
return False
if entry["decision_type"] not in {"PASS", "WARN", "UNKNOWN"}:
return False
return True
def generate_summary(entries: List[Dict[str, Any]]) -> str:
total = len(entries)
if total == 0:
return "Keine Logeinträge vorhanden."
counts = {"PASS": 0, "WARN": 0, "UNKNOWN": 0}
for e in entries:
dtype = e.get("decision_type")
if dtype in counts:
counts[dtype] += 1
warn_rate = (counts["WARN"] / total) * 100 if total else 0
summary = (
f"Gesamt: {total}\n"
f"PASS: {counts['PASS']}\n"
f"WARN: {counts['WARN']} ({warn_rate:.2f}% Warnrate)\n"
f"UNKNOWN: {counts['UNKNOWN']}\n"
)
return summary
def main() -> None:
parser = argparse.ArgumentParser(description="Generiert einen CI-Zusammenfassungsbericht aus JSON-Logs.")
parser.add_argument("--input", required=True, help="Pfad zu einer JSON-Datei mit Entscheidungs-Logs.")
parser.add_argument("--output", required=False, help="Zielpfad für den generierten Bericht.")
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output) if args.output else Path("output/report.txt")
entries = load_json_file(input_path)
valid_entries = [e for e in entries if validate_log_entry(e)]
report = generate_summary(valid_entries)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(report)
print(f"Bericht gespeichert unter: {output_path}")
if __name__ == "__main__":
main()