diff --git a/3.snapshot_comparator/src/snapshot_comparator/core.py b/3.snapshot_comparator/src/snapshot_comparator/core.py new file mode 100644 index 0000000..4a79e33 --- /dev/null +++ b/3.snapshot_comparator/src/snapshot_comparator/core.py @@ -0,0 +1,83 @@ +from dataclasses import dataclass +from typing import List, Dict, Any +import logging + +__all__ = ["Snapshot", "compare_snapshots"] + + +@dataclass +class Snapshot: + artifact_key: str + status_before: str + status_after: str + + +class SnapshotValidationError(Exception): + """Custom exception raised when snapshot validation fails.""" + + +logger = logging.getLogger(__name__) +if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +def _validate_snapshot(snapshot: Any) -> Snapshot: + """Validates and converts a raw snapshot dict or object into a Snapshot instance.""" + if isinstance(snapshot, Snapshot): + return snapshot + if not isinstance(snapshot, dict): + raise SnapshotValidationError(f"Snapshot must be dict or Snapshot, got {type(snapshot)}") + + required_fields = ["artifact_key", "status_before", "status_after"] + for field in required_fields: + if field not in snapshot: + raise SnapshotValidationError(f"Missing required field '{field}' in snapshot") + if not isinstance(snapshot[field], str): + raise SnapshotValidationError(f"Field '{field}' must be of type str") + return Snapshot( + artifact_key=snapshot["artifact_key"], + status_before=snapshot["status_before"], + status_after=snapshot["status_after"] + ) + + +def compare_snapshots(snapshot1: List[Any], snapshot2: List[Any]) -> Dict[str, Dict[str, str]]: + """Vergleicht zwei Snapshot-Listen und liefert eine Differenz der Artefaktzustände. + + Args: + snapshot1: Liste von Snapshot-Objekten oder -Dicts vor Δt. + snapshot2: Liste von Snapshot-Objekten oder -Dicts nach Δt. + + Returns: + Dict[str, Dict[str, str]]: Differenzen nach artifact_key, + z.B. {"artifactA": {"before": "missing", "after": "present"}}. + """ + assert isinstance(snapshot1, list), "snapshot1 must be a list" + assert isinstance(snapshot2, list), "snapshot2 must be a list" + + logger.info("Validating and indexing snapshots ...") + snap1_valid = [_validate_snapshot(s) for s in snapshot1] + snap2_valid = [_validate_snapshot(s) for s in snapshot2] + + index1 = {s.artifact_key: s for s in snap1_valid} + index2 = {s.artifact_key: s for s in snap2_valid} + + all_keys = set(index1.keys()) | set(index2.keys()) + diff: Dict[str, Dict[str, str]] = {} + + for key in all_keys: + before = index1.get(key) + after = index2.get(key) + + status_before = before.status_after if before else "missing" + status_after = after.status_after if after else "missing" + + if status_before != status_after: + diff[key] = {"before": status_before, "after": status_after} + + logger.info(f"Found {len(diff)} differences between snapshots.") + return diff