Add artifact.whitelist_expiration/src/artifact_whitelist_expiration/core.py

This commit is contained in:
Mika 2026-02-22 12:32:34 +00:00
parent c1de678cc2
commit 5c12809983

View file

@ -0,0 +1,107 @@
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict
class WhitelistEntry:
"""Datenstruktur für Whitelist-Einträge mit Ablaufprüfung und Verlängerung."""
def __init__(self, entry_id: str, expires_at: str, reason_code: str) -> None:
if not isinstance(entry_id, str) or not entry_id:
raise ValueError("entry_id muss ein nicht-leerer String sein")
if not isinstance(reason_code, str) or not reason_code:
raise ValueError("reason_code muss ein nicht-leerer String sein")
try:
self.expires_at = datetime.fromisoformat(expires_at)
except Exception as e:
raise ValueError(f"Ungültiges Datumsformat für expires_at: {e}") from e
self.entry_id = entry_id
self.reason_code = reason_code
def is_expired(self) -> bool:
"""Prüft, ob das Ablaufdatum überschritten ist."""
return datetime.utcnow() > self.expires_at
def renew(self, days: int) -> None:
"""Verlängert das Ablaufdatum um die angegebene Anzahl Tage."""
if not isinstance(days, int) or days <= 0:
raise ValueError("days muss eine positive Ganzzahl sein")
self.expires_at = datetime.utcnow() + timedelta(days=days)
def to_dict(self) -> Dict[str, Any]:
return {
"entry_id": self.entry_id,
"expires_at": self.expires_at.isoformat(),
"reason_code": self.reason_code,
}
# Hilfsfunktionen
_DEF_INPUT_PATH = Path("data/unknown_whitelist.json")
_DEF_OUTPUT_PATH = Path("output/unknown_whitelist_updated.json")
def _load_whitelist(path: Path) -> Dict[str, WhitelistEntry]:
if not path.exists():
raise FileNotFoundError(f"Whitelist-Datei nicht gefunden: {path}")
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
entries: Dict[str, WhitelistEntry] = {}
for entry in data:
e = WhitelistEntry(
entry_id=entry.get("entry_id"),
expires_at=entry.get("expires_at"),
reason_code=entry.get("reason_code"),
)
entries[e.entry_id] = e
return entries
def _save_whitelist(entries: Dict[str, WhitelistEntry], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump([e.to_dict() for e in entries.values()], f, indent=2, ensure_ascii=False)
def set_expiration(entry_id: str, days: int, input_path: Path | None = None, output_path: Path | None = None) -> None:
"""Setzt das Ablaufdatum eines Eintrags neu."""
if not isinstance(entry_id, str) or not entry_id:
raise ValueError("entry_id darf nicht leer sein")
if not isinstance(days, int) or days <= 0:
raise ValueError("days muss > 0 sein")
input_path = input_path or _DEF_INPUT_PATH
output_path = output_path or _DEF_OUTPUT_PATH
entries = _load_whitelist(input_path)
if entry_id not in entries:
raise KeyError(f"Eintrag {entry_id} nicht gefunden")
entry = entries[entry_id]
entry.expires_at = datetime.utcnow() + timedelta(days=days)
_save_whitelist(entries, output_path)
def renew_entry(entry_id: str, input_path: Path | None = None, output_path: Path | None = None) -> bool:
"""Verlängert den Ablauf eines vorhandenen Whitelist-Eintrags, wenn er vorhanden ist."""
if not isinstance(entry_id, str) or not entry_id:
raise ValueError("entry_id darf nicht leer sein")
input_path = input_path or _DEF_INPUT_PATH
output_path = output_path or _DEF_OUTPUT_PATH
entries = _load_whitelist(input_path)
if entry_id not in entries:
return False
entry = entries[entry_id]
if not entry.is_expired():
entry.renew(7) # Standardverlängerung um 7 Tage
_save_whitelist(entries, output_path)
return True
return False