Add correlation_id_generation/src/correlation_id_generation/core.py

This commit is contained in:
Mika 2025-12-25 16:37:21 +00:00
commit 47914c65e4

View file

@ -0,0 +1,86 @@
import argparse
import hashlib
import json
import os
import time
from pathlib import Path
def generate_correlation_id(tgid: int, pid: int, run_seq: int) -> str:
"""Erzeugt eine eindeutige hashbasierte Correlation-ID."""
if not isinstance(tgid, int) or not isinstance(pid, int) or not isinstance(run_seq, int):
raise ValueError("tgid, pid und run_seq müssen ganzzahlige Werte sein")
base = f"{tgid}-{pid}-{run_seq}-{time.time()}"
return hashlib.sha256(base.encode()).hexdigest()
def log_task_state(task: str, state_before: str, state_after: str) -> None:
"""Protokolliert Task-Zustandswechsel mit Correlation-ID."""
if not all(isinstance(x, str) for x in [task, state_before, state_after]):
raise ValueError("task, state_before, state_after müssen Strings sein")
timestamp = time.time()
entry = {
"task": task,
"state_before": state_before,
"state_after": state_after,
"timestamp": timestamp,
}
log_path = Path('output') / 'correlation_log.json'
log_path.parent.mkdir(parents=True, exist_ok=True)
if log_path.exists():
try:
with open(log_path, 'r', encoding='utf-8') as f:
existing = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
existing = []
else:
existing = []
existing.append(entry)
with open(log_path, 'w', encoding='utf-8') as f:
json.dump(existing, f, indent=2)
def _process_input(input_path: str, output_path: str) -> None:
"""Liest Tasks ein, erzeugt Correlation-ID und schreibt Log-Ausgabe."""
input_file = Path(input_path)
if not input_file.exists():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_path}")
with open(input_file, 'r', encoding='utf-8') as f:
tasks_data = json.load(f)
results = []
for idx, t in enumerate(tasks_data):
try:
tgid = int(t.get('tgid', 0))
pid = int(t.get('pid', 0))
except (TypeError, ValueError):
continue
corr_id = generate_correlation_id(tgid, pid, idx)
entry = {
"correlation_id": corr_id,
"tgid": tgid,
"pid": pid,
"timestamp": time.time(),
}
results.append(entry)
out_file = Path(output_path)
out_file.parent.mkdir(parents=True, exist_ok=True)
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2)
def main():
parser = argparse.ArgumentParser(description="Generate and log correlation IDs for task states.")
parser.add_argument('--input', required=True, help='Pfad zur Eingabedatei (JSON)')
parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei (JSON)')
args = parser.parse_args()
_process_input(args.input, args.output)
if __name__ == '__main__':
main()