Add 1.logging_analysis/src/logging_analysis/core.py
This commit is contained in:
commit
9f092d44cc
1 changed files with 112 additions and 0 deletions
112
1.logging_analysis/src/logging_analysis/core.py
Normal file
112
1.logging_analysis/src/logging_analysis/core.py
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict, Any
|
||||
from statistics import mean
|
||||
import logging
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
|
||||
|
||||
|
||||
class LogAnalysisError(Exception):
|
||||
"""Base exception for log analysis errors."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogEntry:
|
||||
"""Repräsentiert einen einzelnen Logeintrag eines Artefakts."""
|
||||
timestamp: datetime
|
||||
expected_artifact_path: str
|
||||
artifact_key: str
|
||||
status: str
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data: Dict[str, Any]) -> 'LogEntry':
|
||||
required_fields = {"timestamp", "expected_artifact_path", "artifact_key", "status"}
|
||||
missing = required_fields - data.keys()
|
||||
if missing:
|
||||
raise LogAnalysisError(f"Missing fields in log entry: {missing}")
|
||||
|
||||
try:
|
||||
timestamp = datetime.fromisoformat(str(data["timestamp"]))
|
||||
except Exception as exc:
|
||||
raise LogAnalysisError(f"Invalid timestamp format: {data['timestamp']}") from exc
|
||||
|
||||
status = str(data["status"]).lower()
|
||||
if status not in {"missing", "present", "delayed", "unknown"}:
|
||||
raise LogAnalysisError(f"Invalid status value: {status}")
|
||||
|
||||
return LogEntry(
|
||||
timestamp=timestamp,
|
||||
expected_artifact_path=str(data["expected_artifact_path"]),
|
||||
artifact_key=str(data["artifact_key"]),
|
||||
status=status,
|
||||
)
|
||||
|
||||
|
||||
def analyze_log(log_file_path: str) -> Dict[str, Any]:
|
||||
"""Analysiert eine Logdatei mit Artefakt-Daten und erstellt eine Zusammenfassung über fehlende oder verspätete Artefakte."""
|
||||
path = Path(log_file_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Log file not found: {log_file_path}")
|
||||
|
||||
try:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
raw_data = json.load(f)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise LogAnalysisError(f"Invalid JSON in log file: {exc}") from exc
|
||||
|
||||
if not isinstance(raw_data, list):
|
||||
raise LogAnalysisError("Expected a list of log entries in JSON file.")
|
||||
|
||||
entries: List[LogEntry] = []
|
||||
for i, item in enumerate(raw_data):
|
||||
try:
|
||||
entry = LogEntry.from_dict(item)
|
||||
entries.append(entry)
|
||||
except LogAnalysisError as exc:
|
||||
logging.warning(f"Skipping invalid entry #{i}: {exc}")
|
||||
|
||||
total = len(entries)
|
||||
if total == 0:
|
||||
raise LogAnalysisError("No valid log entries found in file.")
|
||||
|
||||
missing_count = sum(1 for e in entries if e.status == "missing")
|
||||
delayed_count = sum(1 for e in entries if e.status == "delayed")
|
||||
present_count = sum(1 for e in entries if e.status == "present")
|
||||
|
||||
missing_rate = missing_count / total
|
||||
delayed_rate = delayed_count / total
|
||||
|
||||
timestamps = [e.timestamp.timestamp() for e in entries if e.status == "present"]
|
||||
avg_interval = None
|
||||
if len(timestamps) > 1:
|
||||
intervals = [t2 - t1 for t1, t2 in zip(timestamps, timestamps[1:]) if t2 >= t1]
|
||||
if intervals:
|
||||
avg_interval = mean(intervals)
|
||||
|
||||
summary = {
|
||||
"total_entries": total,
|
||||
"missing_count": missing_count,
|
||||
"delayed_count": delayed_count,
|
||||
"present_count": present_count,
|
||||
"missing_rate": round(missing_rate, 3),
|
||||
"delayed_rate": round(delayed_rate, 3),
|
||||
"avg_present_interval_sec": round(avg_interval, 3) if avg_interval is not None else None,
|
||||
"summary_text": (
|
||||
f"Logs analyzed: {total}. Missing: {missing_count} ({missing_rate:.1%}), "
|
||||
f"Delayed: {delayed_count} ({delayed_rate:.1%}), Present: {present_count}."
|
||||
)
|
||||
}
|
||||
|
||||
assert 0.0 <= summary["missing_rate"] <= 1.0, "Missing rate out of range"
|
||||
assert 0.0 <= summary["delayed_rate"] <= 1.0, "Delayed rate out of range"
|
||||
|
||||
logging.info(summary["summary_text"])
|
||||
|
||||
return summary
|
||||
Loading…
Reference in a new issue