From 73bbd606aa3560721d4449c9a9b139d82281911b Mon Sep 17 00:00:00 2001 From: Mika Date: Fri, 16 Jan 2026 11:18:58 +0000 Subject: [PATCH] Add 1_write_events/src/write_events/core.py --- 1_write_events/src/write_events/core.py | 86 +++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 1_write_events/src/write_events/core.py diff --git a/1_write_events/src/write_events/core.py b/1_write_events/src/write_events/core.py new file mode 100644 index 0000000..383feda --- /dev/null +++ b/1_write_events/src/write_events/core.py @@ -0,0 +1,86 @@ +import json +import os +import datetime +import argparse +from pathlib import Path +from typing import TypedDict, Any + + +class WriteEvent(TypedDict): + timestamp: str + field_tag: str + new_value_hash: str + corr_id: str + cpu: int + ktime_ns: int + + +class WriteEventError(Exception): + """Custom exception for invalid write event parameters.""" + pass + + +def _get_cpu() -> int: + try: + return os.sched_getcpu() + except AttributeError: + return -1 + + +def _get_ktime_ns() -> int: + return int(datetime.datetime.now().timestamp() * 1e9) + + +def _get_event_log_path() -> Path: + base_dir = Path("output") + base_dir.mkdir(parents=True, exist_ok=True) + return base_dir / "write_events.json" + + +def _validate_inputs(field_tag: Any, new_value_hash: Any, corr_id: Any) -> None: + if not all(isinstance(v, str) and v.strip() for v in (field_tag, new_value_hash, corr_id)): + raise WriteEventError("All parameters must be non-empty strings.") + + +def _serialize_event(event: WriteEvent) -> dict: + return dict(event) + + +def record_write_event(field_tag: str, new_value_hash: str, corr_id: str) -> None: + """Erfasst ein Write-Event und schreibt es als JSON in eine Log-Datei.""" + _validate_inputs(field_tag, new_value_hash, corr_id) + + event: WriteEvent = { + "timestamp": datetime.datetime.now().astimezone().isoformat(), + "field_tag": field_tag, + "new_value_hash": new_value_hash, + "corr_id": corr_id, + "cpu": _get_cpu(), + "ktime_ns": _get_ktime_ns(), + } + + log_path = _get_event_log_path() + + existing_data: list[dict] = [] + if log_path.exists() and log_path.stat().st_size > 0: + try: + with log_path.open("r", encoding="utf-8") as f: + existing_data = json.load(f) + if not isinstance(existing_data, list): + existing_data = [] + except (json.JSONDecodeError, OSError): + existing_data = [] + + existing_data.append(_serialize_event(event)) + + with log_path.open("w", encoding="utf-8") as f: + json.dump(existing_data, f, indent=2) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Record a write event.") + parser.add_argument("--field-tag", required=True, help="Field tag to log.") + parser.add_argument("--new-value-hash", required=True, help="Hash of the new value.") + parser.add_argument("--corr-id", required=True, help="Correlation ID.") + args = parser.parse_args() + record_write_event(args.field_tag, args.new_value_hash, args.corr_id) \ No newline at end of file