Add input_validation_script/src/input_validation_script/core.py
This commit is contained in:
commit
65be4c9edc
1 changed files with 74 additions and 0 deletions
74
input_validation_script/src/input_validation_script/core.py
Normal file
74
input_validation_script/src/input_validation_script/core.py
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Union
|
||||
from jsonschema import validate, ValidationError
|
||||
|
||||
|
||||
class ValidationException(Exception):
|
||||
"""Basis-Ausnahme für Validierungsfehler."""
|
||||
|
||||
|
||||
class ArtifactMissingError(ValidationException):
|
||||
"""Wenn eine erwartete Datei fehlt."""
|
||||
|
||||
|
||||
class ParseError(ValidationException):
|
||||
"""Wenn JSON nicht korrekt geparst werden kann."""
|
||||
|
||||
|
||||
class ContractViolationError(ValidationException):
|
||||
"""Wenn das JSON nicht dem Schema entspricht."""
|
||||
|
||||
|
||||
def _load_json(source: Union[str, Path, Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Hilfsfunktion, um JSON-Daten aus Pfad oder Dict zu laden."""
|
||||
if isinstance(source, dict):
|
||||
return source
|
||||
if not isinstance(source, (str, Path)):
|
||||
raise ParseError(f"Ungültiger JSON-Quelletyp: {type(source)}")
|
||||
|
||||
path = Path(source)
|
||||
if not path.exists():
|
||||
raise ArtifactMissingError(f"Datei nicht gefunden: {path}")
|
||||
try:
|
||||
with path.open('r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ParseError(f"Fehler beim Lesen des JSON: {e}") from e
|
||||
|
||||
|
||||
def validate_json(input_json: Union[Dict[str, Any], str], schema: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Validiert JSON gegen ein Schema.
|
||||
|
||||
Args:
|
||||
input_json: JSON-Daten oder Pfad zur JSON-Datei.
|
||||
schema: Das JSON-Schema.
|
||||
|
||||
Returns:
|
||||
Dict mit Schlüsseln: valid (bool), errors (list[str]), optional unknown_reason (str)
|
||||
"""
|
||||
result: Dict[str, Any] = {"valid": False, "errors": []}
|
||||
|
||||
try:
|
||||
data = _load_json(input_json)
|
||||
except ArtifactMissingError as e:
|
||||
result["errors"].append(str(e))
|
||||
result["unknown_reason"] = "artefact_missing"
|
||||
return result
|
||||
except ParseError as e:
|
||||
result["errors"].append(str(e))
|
||||
result["unknown_reason"] = "parse_error"
|
||||
return result
|
||||
|
||||
# Validationsprüfung
|
||||
try:
|
||||
validate(instance=data, schema=schema)
|
||||
except ValidationError as e:
|
||||
result["errors"].append(str(e))
|
||||
result["unknown_reason"] = "contract_violation"
|
||||
return result
|
||||
|
||||
# Erfolgreich validiert
|
||||
result["valid"] = True
|
||||
return result
|
||||
Loading…
Reference in a new issue