Add 1_write_events/src/write_events/core.py

This commit is contained in:
Mika 2026-01-16 11:18:58 +00:00
commit 73bbd606aa

View file

@ -0,0 +1,86 @@
import json
import os
import datetime
import argparse
from pathlib import Path
from typing import TypedDict, Any
class WriteEvent(TypedDict):
timestamp: str
field_tag: str
new_value_hash: str
corr_id: str
cpu: int
ktime_ns: int
class WriteEventError(Exception):
"""Custom exception for invalid write event parameters."""
pass
def _get_cpu() -> int:
try:
return os.sched_getcpu()
except AttributeError:
return -1
def _get_ktime_ns() -> int:
return int(datetime.datetime.now().timestamp() * 1e9)
def _get_event_log_path() -> Path:
base_dir = Path("output")
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir / "write_events.json"
def _validate_inputs(field_tag: Any, new_value_hash: Any, corr_id: Any) -> None:
if not all(isinstance(v, str) and v.strip() for v in (field_tag, new_value_hash, corr_id)):
raise WriteEventError("All parameters must be non-empty strings.")
def _serialize_event(event: WriteEvent) -> dict:
return dict(event)
def record_write_event(field_tag: str, new_value_hash: str, corr_id: str) -> None:
"""Erfasst ein Write-Event und schreibt es als JSON in eine Log-Datei."""
_validate_inputs(field_tag, new_value_hash, corr_id)
event: WriteEvent = {
"timestamp": datetime.datetime.now().astimezone().isoformat(),
"field_tag": field_tag,
"new_value_hash": new_value_hash,
"corr_id": corr_id,
"cpu": _get_cpu(),
"ktime_ns": _get_ktime_ns(),
}
log_path = _get_event_log_path()
existing_data: list[dict] = []
if log_path.exists() and log_path.stat().st_size > 0:
try:
with log_path.open("r", encoding="utf-8") as f:
existing_data = json.load(f)
if not isinstance(existing_data, list):
existing_data = []
except (json.JSONDecodeError, OSError):
existing_data = []
existing_data.append(_serialize_event(event))
with log_path.open("w", encoding="utf-8") as f:
json.dump(existing_data, f, indent=2)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Record a write event.")
parser.add_argument("--field-tag", required=True, help="Field tag to log.")
parser.add_argument("--new-value-hash", required=True, help="Hash of the new value.")
parser.add_argument("--corr-id", required=True, help="Correlation ID.")
args = parser.parse_args()
record_write_event(args.field_tag, args.new_value_hash, args.corr_id)