diff --git a/sanity_check_tool/src/sanity_check_tool/core.py b/sanity_check_tool/src/sanity_check_tool/core.py new file mode 100644 index 0000000..10dafa6 --- /dev/null +++ b/sanity_check_tool/src/sanity_check_tool/core.py @@ -0,0 +1,81 @@ +import json +from typing import Dict, Any + + +class InputValidationError(Exception): + """Raised when input data does not conform to expected structure.""" + + +class SanityResults: + """Simple result container for sanity check outcomes.""" + + def __init__(self, missing_write_pairs: int, broken_corr_ids: int, empty_fields: int) -> None: + assert missing_write_pairs >= 0, "missing_write_pairs must be >= 0" + assert broken_corr_ids >= 0, "broken_corr_ids must be >= 0" + assert empty_fields >= 0, "empty_fields must be >= 0" + self.missing_write_pairs = missing_write_pairs + self.broken_corr_ids = broken_corr_ids + self.empty_fields = empty_fields + + def to_dict(self) -> Dict[str, int]: + return { + "missing_write_pairs": self.missing_write_pairs, + "broken_corr_ids": self.broken_corr_ids, + "empty_fields": self.empty_fields, + } + + +_DEF_REQUIRED_FIELDS = {"write_pre", "write_post", "corr_id"} + + +def perform_sanity_check(run_summary: Dict[str, Any]) -> Dict[str, int]: + """Überprüft die Run-Summary auf Konsistenzprobleme. + + Args: + run_summary: Aggregierte Laufdaten, typischerweise aus einer JSON-Datei geladen. + + Returns: + Dictionary mit den Ergebnissen der Sanity-Prüfung. + """ + if not isinstance(run_summary, dict): + raise InputValidationError("run_summary must be a dictionary.") + entries = run_summary.get("entries") + if not isinstance(entries, list): + raise InputValidationError("run_summary must contain a list under key 'entries'.") + + missing_write_pairs = 0 + broken_corr_ids = 0 + empty_fields = 0 + + seen_ids = set() + + for entry in entries: + if not isinstance(entry, dict): + continue + + # Check missing write_pre/write_post + has_pre = entry.get("write_pre") is not None + has_post = entry.get("write_post") is not None + if has_pre ^ has_post: + missing_write_pairs += 1 + + # Check broken correlation chains + corr_id = entry.get("corr_id") + if corr_id in seen_ids: + broken_corr_ids += 1 + elif corr_id is not None: + seen_ids.add(corr_id) + + # Check empty mandatory fields + for f in _DEF_REQUIRED_FIELDS: + v = entry.get(f) + if v in (None, ""): + empty_fields += 1 + + results = SanityResults( + missing_write_pairs=missing_write_pairs, + broken_corr_ids=broken_corr_ids, + empty_fields=empty_fields, + ) + + return results.to_dict()