From 65be4c9edc8c0d8914715b189e3f34a1d348843e Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 8 Feb 2026 16:05:52 +0000 Subject: [PATCH] Add input_validation_script/src/input_validation_script/core.py --- .../src/input_validation_script/core.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 input_validation_script/src/input_validation_script/core.py diff --git a/input_validation_script/src/input_validation_script/core.py b/input_validation_script/src/input_validation_script/core.py new file mode 100644 index 0000000..48826ce --- /dev/null +++ b/input_validation_script/src/input_validation_script/core.py @@ -0,0 +1,74 @@ +import json +import os +from pathlib import Path +from typing import Any, Dict, Union +from jsonschema import validate, ValidationError + + +class ValidationException(Exception): + """Basis-Ausnahme für Validierungsfehler.""" + + +class ArtifactMissingError(ValidationException): + """Wenn eine erwartete Datei fehlt.""" + + +class ParseError(ValidationException): + """Wenn JSON nicht korrekt geparst werden kann.""" + + +class ContractViolationError(ValidationException): + """Wenn das JSON nicht dem Schema entspricht.""" + + +def _load_json(source: Union[str, Path, Dict[str, Any]]) -> Dict[str, Any]: + """Hilfsfunktion, um JSON-Daten aus Pfad oder Dict zu laden.""" + if isinstance(source, dict): + return source + if not isinstance(source, (str, Path)): + raise ParseError(f"Ungültiger JSON-Quelletyp: {type(source)}") + + path = Path(source) + if not path.exists(): + raise ArtifactMissingError(f"Datei nicht gefunden: {path}") + try: + with path.open('r', encoding='utf-8') as f: + return json.load(f) + except json.JSONDecodeError as e: + raise ParseError(f"Fehler beim Lesen des JSON: {e}") from e + + +def validate_json(input_json: Union[Dict[str, Any], str], schema: Dict[str, Any]) -> Dict[str, Any]: + """Validiert JSON gegen ein Schema. + + Args: + input_json: JSON-Daten oder Pfad zur JSON-Datei. + schema: Das JSON-Schema. + + Returns: + Dict mit Schlüsseln: valid (bool), errors (list[str]), optional unknown_reason (str) + """ + result: Dict[str, Any] = {"valid": False, "errors": []} + + try: + data = _load_json(input_json) + except ArtifactMissingError as e: + result["errors"].append(str(e)) + result["unknown_reason"] = "artefact_missing" + return result + except ParseError as e: + result["errors"].append(str(e)) + result["unknown_reason"] = "parse_error" + return result + + # Validationsprüfung + try: + validate(instance=data, schema=schema) + except ValidationError as e: + result["errors"].append(str(e)) + result["unknown_reason"] = "contract_violation" + return result + + # Erfolgreich validiert + result["valid"] = True + return result