From 9f092d44cc0242527e4739b0bfb455e665e5f166 Mon Sep 17 00:00:00 2001 From: Mika Date: Fri, 13 Feb 2026 12:26:33 +0000 Subject: [PATCH] Add 1.logging_analysis/src/logging_analysis/core.py --- .../src/logging_analysis/core.py | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 1.logging_analysis/src/logging_analysis/core.py diff --git a/1.logging_analysis/src/logging_analysis/core.py b/1.logging_analysis/src/logging_analysis/core.py new file mode 100644 index 0000000..639617f --- /dev/null +++ b/1.logging_analysis/src/logging_analysis/core.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import json +from pathlib import Path +from datetime import datetime +from dataclasses import dataclass +from typing import List, Dict, Any +from statistics import mean +import logging + + +logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + + +class LogAnalysisError(Exception): + """Base exception for log analysis errors.""" + pass + + +@dataclass +class LogEntry: + """Repräsentiert einen einzelnen Logeintrag eines Artefakts.""" + timestamp: datetime + expected_artifact_path: str + artifact_key: str + status: str + + @staticmethod + def from_dict(data: Dict[str, Any]) -> 'LogEntry': + required_fields = {"timestamp", "expected_artifact_path", "artifact_key", "status"} + missing = required_fields - data.keys() + if missing: + raise LogAnalysisError(f"Missing fields in log entry: {missing}") + + try: + timestamp = datetime.fromisoformat(str(data["timestamp"])) + except Exception as exc: + raise LogAnalysisError(f"Invalid timestamp format: {data['timestamp']}") from exc + + status = str(data["status"]).lower() + if status not in {"missing", "present", "delayed", "unknown"}: + raise LogAnalysisError(f"Invalid status value: {status}") + + return LogEntry( + timestamp=timestamp, + expected_artifact_path=str(data["expected_artifact_path"]), + artifact_key=str(data["artifact_key"]), + status=status, + ) + + +def analyze_log(log_file_path: str) -> Dict[str, Any]: + """Analysiert eine Logdatei mit Artefakt-Daten und erstellt eine Zusammenfassung über fehlende oder verspätete Artefakte.""" + path = Path(log_file_path) + if not path.exists(): + raise FileNotFoundError(f"Log file not found: {log_file_path}") + + try: + with path.open("r", encoding="utf-8") as f: + raw_data = json.load(f) + except json.JSONDecodeError as exc: + raise LogAnalysisError(f"Invalid JSON in log file: {exc}") from exc + + if not isinstance(raw_data, list): + raise LogAnalysisError("Expected a list of log entries in JSON file.") + + entries: List[LogEntry] = [] + for i, item in enumerate(raw_data): + try: + entry = LogEntry.from_dict(item) + entries.append(entry) + except LogAnalysisError as exc: + logging.warning(f"Skipping invalid entry #{i}: {exc}") + + total = len(entries) + if total == 0: + raise LogAnalysisError("No valid log entries found in file.") + + missing_count = sum(1 for e in entries if e.status == "missing") + delayed_count = sum(1 for e in entries if e.status == "delayed") + present_count = sum(1 for e in entries if e.status == "present") + + missing_rate = missing_count / total + delayed_rate = delayed_count / total + + timestamps = [e.timestamp.timestamp() for e in entries if e.status == "present"] + avg_interval = None + if len(timestamps) > 1: + intervals = [t2 - t1 for t1, t2 in zip(timestamps, timestamps[1:]) if t2 >= t1] + if intervals: + avg_interval = mean(intervals) + + summary = { + "total_entries": total, + "missing_count": missing_count, + "delayed_count": delayed_count, + "present_count": present_count, + "missing_rate": round(missing_rate, 3), + "delayed_rate": round(delayed_rate, 3), + "avg_present_interval_sec": round(avg_interval, 3) if avg_interval is not None else None, + "summary_text": ( + f"Logs analyzed: {total}. Missing: {missing_count} ({missing_rate:.1%}), " + f"Delayed: {delayed_count} ({delayed_rate:.1%}), Present: {present_count}." + ) + } + + assert 0.0 <= summary["missing_rate"] <= 1.0, "Missing rate out of range" + assert 0.0 <= summary["delayed_rate"] <= 1.0, "Delayed rate out of range" + + logging.info(summary["summary_text"]) + + return summary