From 5c1280998378c2068eb60d3527370add183c8d92 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 22 Feb 2026 12:32:34 +0000 Subject: [PATCH] Add artifact.whitelist_expiration/src/artifact_whitelist_expiration/core.py --- .../src/artifact_whitelist_expiration/core.py | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 artifact.whitelist_expiration/src/artifact_whitelist_expiration/core.py diff --git a/artifact.whitelist_expiration/src/artifact_whitelist_expiration/core.py b/artifact.whitelist_expiration/src/artifact_whitelist_expiration/core.py new file mode 100644 index 0000000..fe97dc6 --- /dev/null +++ b/artifact.whitelist_expiration/src/artifact_whitelist_expiration/core.py @@ -0,0 +1,107 @@ +import json +import os +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict + + +class WhitelistEntry: + """Datenstruktur für Whitelist-Einträge mit Ablaufprüfung und Verlängerung.""" + + def __init__(self, entry_id: str, expires_at: str, reason_code: str) -> None: + if not isinstance(entry_id, str) or not entry_id: + raise ValueError("entry_id muss ein nicht-leerer String sein") + if not isinstance(reason_code, str) or not reason_code: + raise ValueError("reason_code muss ein nicht-leerer String sein") + + try: + self.expires_at = datetime.fromisoformat(expires_at) + except Exception as e: + raise ValueError(f"Ungültiges Datumsformat für expires_at: {e}") from e + + self.entry_id = entry_id + self.reason_code = reason_code + + def is_expired(self) -> bool: + """Prüft, ob das Ablaufdatum überschritten ist.""" + return datetime.utcnow() > self.expires_at + + def renew(self, days: int) -> None: + """Verlängert das Ablaufdatum um die angegebene Anzahl Tage.""" + if not isinstance(days, int) or days <= 0: + raise ValueError("days muss eine positive Ganzzahl sein") + self.expires_at = datetime.utcnow() + timedelta(days=days) + + def to_dict(self) -> Dict[str, Any]: + return { + "entry_id": self.entry_id, + "expires_at": self.expires_at.isoformat(), + "reason_code": self.reason_code, + } + + +# Hilfsfunktionen +_DEF_INPUT_PATH = Path("data/unknown_whitelist.json") +_DEF_OUTPUT_PATH = Path("output/unknown_whitelist_updated.json") + + +def _load_whitelist(path: Path) -> Dict[str, WhitelistEntry]: + if not path.exists(): + raise FileNotFoundError(f"Whitelist-Datei nicht gefunden: {path}") + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + entries: Dict[str, WhitelistEntry] = {} + for entry in data: + e = WhitelistEntry( + entry_id=entry.get("entry_id"), + expires_at=entry.get("expires_at"), + reason_code=entry.get("reason_code"), + ) + entries[e.entry_id] = e + return entries + + +def _save_whitelist(entries: Dict[str, WhitelistEntry], path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump([e.to_dict() for e in entries.values()], f, indent=2, ensure_ascii=False) + + +def set_expiration(entry_id: str, days: int, input_path: Path | None = None, output_path: Path | None = None) -> None: + """Setzt das Ablaufdatum eines Eintrags neu.""" + if not isinstance(entry_id, str) or not entry_id: + raise ValueError("entry_id darf nicht leer sein") + if not isinstance(days, int) or days <= 0: + raise ValueError("days muss > 0 sein") + + input_path = input_path or _DEF_INPUT_PATH + output_path = output_path or _DEF_OUTPUT_PATH + + entries = _load_whitelist(input_path) + if entry_id not in entries: + raise KeyError(f"Eintrag {entry_id} nicht gefunden") + + entry = entries[entry_id] + entry.expires_at = datetime.utcnow() + timedelta(days=days) + _save_whitelist(entries, output_path) + + +def renew_entry(entry_id: str, input_path: Path | None = None, output_path: Path | None = None) -> bool: + """Verlängert den Ablauf eines vorhandenen Whitelist-Eintrags, wenn er vorhanden ist.""" + if not isinstance(entry_id, str) or not entry_id: + raise ValueError("entry_id darf nicht leer sein") + + input_path = input_path or _DEF_INPUT_PATH + output_path = output_path or _DEF_OUTPUT_PATH + + entries = _load_whitelist(input_path) + if entry_id not in entries: + return False + + entry = entries[entry_id] + if not entry.is_expired(): + entry.renew(7) # Standardverlängerung um 7 Tage + _save_whitelist(entries, output_path) + return True + + return False