From 6bff3c3f00d79f17c1f78ae6bd105e55e0858def Mon Sep 17 00:00:00 2001 From: Mika Date: Mon, 30 Mar 2026 16:33:36 +0000 Subject: [PATCH] Add data_collection/src/data_collection/core.py --- data_collection/src/data_collection/core.py | 96 +++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 data_collection/src/data_collection/core.py diff --git a/data_collection/src/data_collection/core.py b/data_collection/src/data_collection/core.py new file mode 100644 index 0000000..ce9a441 --- /dev/null +++ b/data_collection/src/data_collection/core.py @@ -0,0 +1,96 @@ +import argparse +import json +import os +import sys +import statistics +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict + + +class InputValidationError(Exception): + """Custom exception for invalid input parameters.""" + + +class RunDataValidator: + """Handles validation of collected run data fields.""" + + @staticmethod + def validate(data: Dict[str, Any]) -> None: + required_fields = { + 'epoch_ms': int, + 'monotonic_ns': int, + 'tz_offset_minutes': int, + 'retry_tail_p99': float, + 'band_width': float, + } + for key, expected_type in required_fields.items(): + if key not in data: + raise InputValidationError(f"Missing field '{key}' in collected data.") + if not isinstance(data[key], expected_type): + raise InputValidationError( + f"Invalid type for field '{key}': expected {expected_type.__name__}, got {type(data[key]).__name__}." + ) + if not (-720 <= data['tz_offset_minutes'] <= 840): + raise InputValidationError("tz_offset_minutes must be within -720 to 840 (UTC-12 to UTC+14).") + if data['retry_tail_p99'] < 0 or data['band_width'] <= 0: + raise InputValidationError("retry_tail_p99 must be non-negative and band_width must be positive.") + + +def collect_run_data(run_id: int, worker_count: int) -> Dict[str, Any]: + """Sammelt Leistungs- und Zeitdaten eines bestimmten System-Runs.""" + if not isinstance(run_id, int) or run_id < 0: + raise InputValidationError("run_id must be a non-negative integer.") + if not isinstance(worker_count, int) or worker_count <= 0: + raise InputValidationError("worker_count must be a positive integer.") + + now = datetime.now(timezone.utc) + epoch_ms = int(now.timestamp() * 1000) + monotonic_ns = int(os.times().elapsed * 1e9) + tz_offset_minutes = int((now.utcoffset().total_seconds() if now.utcoffset() else 0) / 60) + + # Generate pseudo metrics for illustrative data collection + retry_tail_p99 = statistics.fmean([worker_count * 0.5, run_id * 0.1]) + band_width = max(0.1, 100.0 / worker_count) + + run_data = { + 'epoch_ms': epoch_ms, + 'monotonic_ns': monotonic_ns, + 'tz_offset_minutes': tz_offset_minutes, + 'retry_tail_p99': retry_tail_p99, + 'band_width': band_width, + } + + RunDataValidator.validate(run_data) + + return run_data + + +def _write_output(data: Dict[str, Any], output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open('w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + +def main() -> None: + parser = argparse.ArgumentParser(description='Collect run performance data.') + parser.add_argument('--run_id', type=int, required=True, help='ID des aktuellen Runs.') + parser.add_argument('--worker_count', type=int, required=True, help='Anzahl der Worker für diesen Run.') + parser.add_argument('--output', type=str, required=True, help='Pfad zur Ausgabe-JSON-Datei.') + + args = parser.parse_args() + + try: + data = collect_run_data(args.run_id, args.worker_count) + _write_output(data, Path(args.output)) + print(f"Data successfully written to {args.output}") + except InputValidationError as e: + print(f"Input validation error: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"Unexpected error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main()