Add logging_tool/src/logging_tool/core.py
This commit is contained in:
commit
891322e7a0
1 changed files with 85 additions and 0 deletions
85
logging_tool/src/logging_tool/core.py
Normal file
85
logging_tool/src/logging_tool/core.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
|
||||
class LogValidationError(Exception):
|
||||
"""Custom exception for invalid log entries."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogEntry:
|
||||
"""Repräsentiert einen einzelnen Preflight-Versuch."""
|
||||
|
||||
timestamp: datetime
|
||||
measured_p: float
|
||||
freeze_ok: bool
|
||||
setup_fingerprint: str
|
||||
policy_hash: str
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Konvertiert den Eintrag in ein JSON-kompatibles Dictionary."""
|
||||
d = asdict(self)
|
||||
d["timestamp"] = self.timestamp.isoformat()
|
||||
return d
|
||||
|
||||
|
||||
def _validate_log_entry(entry: LogEntry) -> None:
|
||||
"""Validiert die Felder des LogEntry streng nach Typen."""
|
||||
assert isinstance(entry.timestamp, datetime), "timestamp must be datetime"
|
||||
assert isinstance(entry.measured_p, (float, int)), "measured_p must be a number"
|
||||
assert isinstance(entry.freeze_ok, bool), "freeze_ok must be bool"
|
||||
assert isinstance(entry.setup_fingerprint, str) and entry.setup_fingerprint, (
|
||||
"setup_fingerprint must be non-empty string"
|
||||
)
|
||||
assert isinstance(entry.policy_hash, str) and entry.policy_hash, (
|
||||
"policy_hash must be non-empty string"
|
||||
)
|
||||
|
||||
|
||||
def log_preflight_attempt(
|
||||
timestamp: datetime,
|
||||
measured_p: float,
|
||||
freeze_ok: bool,
|
||||
setup_fingerprint: str,
|
||||
policy_hash: str,
|
||||
) -> None:
|
||||
"""Protokolliert einen Preflight-Versuch als JSON-Eintrag."""
|
||||
|
||||
entry = LogEntry(
|
||||
timestamp=timestamp,
|
||||
measured_p=float(measured_p),
|
||||
freeze_ok=bool(freeze_ok),
|
||||
setup_fingerprint=str(setup_fingerprint),
|
||||
policy_hash=str(policy_hash),
|
||||
)
|
||||
|
||||
_validate_log_entry(entry)
|
||||
|
||||
log_dir = Path("output")
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
log_file = log_dir / "preflight_log.json"
|
||||
|
||||
try:
|
||||
if log_file.exists():
|
||||
with log_file.open("r", encoding="utf-8") as f:
|
||||
try:
|
||||
existing_logs = json.load(f)
|
||||
if not isinstance(existing_logs, list):
|
||||
existing_logs = []
|
||||
except json.JSONDecodeError:
|
||||
existing_logs = []
|
||||
else:
|
||||
existing_logs = []
|
||||
|
||||
existing_logs.append(entry.to_dict())
|
||||
|
||||
with log_file.open("w", encoding="utf-8") as f:
|
||||
json.dump(existing_logs, f, indent=2)
|
||||
|
||||
except (OSError, json.JSONDecodeError) as e:
|
||||
raise LogValidationError(f"Failed to log preflight attempt: {e}") from e
|
||||
Loading…
Reference in a new issue