Add data_collection/src/data_collection/core.py

This commit is contained in:
Mika 2026-03-30 16:33:36 +00:00
parent 0bae3cda3b
commit 6bff3c3f00

View file

@ -0,0 +1,96 @@
import argparse
import json
import os
import sys
import statistics
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict
class InputValidationError(Exception):
"""Custom exception for invalid input parameters."""
class RunDataValidator:
"""Handles validation of collected run data fields."""
@staticmethod
def validate(data: Dict[str, Any]) -> None:
required_fields = {
'epoch_ms': int,
'monotonic_ns': int,
'tz_offset_minutes': int,
'retry_tail_p99': float,
'band_width': float,
}
for key, expected_type in required_fields.items():
if key not in data:
raise InputValidationError(f"Missing field '{key}' in collected data.")
if not isinstance(data[key], expected_type):
raise InputValidationError(
f"Invalid type for field '{key}': expected {expected_type.__name__}, got {type(data[key]).__name__}."
)
if not (-720 <= data['tz_offset_minutes'] <= 840):
raise InputValidationError("tz_offset_minutes must be within -720 to 840 (UTC-12 to UTC+14).")
if data['retry_tail_p99'] < 0 or data['band_width'] <= 0:
raise InputValidationError("retry_tail_p99 must be non-negative and band_width must be positive.")
def collect_run_data(run_id: int, worker_count: int) -> Dict[str, Any]:
"""Sammelt Leistungs- und Zeitdaten eines bestimmten System-Runs."""
if not isinstance(run_id, int) or run_id < 0:
raise InputValidationError("run_id must be a non-negative integer.")
if not isinstance(worker_count, int) or worker_count <= 0:
raise InputValidationError("worker_count must be a positive integer.")
now = datetime.now(timezone.utc)
epoch_ms = int(now.timestamp() * 1000)
monotonic_ns = int(os.times().elapsed * 1e9)
tz_offset_minutes = int((now.utcoffset().total_seconds() if now.utcoffset() else 0) / 60)
# Generate pseudo metrics for illustrative data collection
retry_tail_p99 = statistics.fmean([worker_count * 0.5, run_id * 0.1])
band_width = max(0.1, 100.0 / worker_count)
run_data = {
'epoch_ms': epoch_ms,
'monotonic_ns': monotonic_ns,
'tz_offset_minutes': tz_offset_minutes,
'retry_tail_p99': retry_tail_p99,
'band_width': band_width,
}
RunDataValidator.validate(run_data)
return run_data
def _write_output(data: Dict[str, Any], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def main() -> None:
parser = argparse.ArgumentParser(description='Collect run performance data.')
parser.add_argument('--run_id', type=int, required=True, help='ID des aktuellen Runs.')
parser.add_argument('--worker_count', type=int, required=True, help='Anzahl der Worker für diesen Run.')
parser.add_argument('--output', type=str, required=True, help='Pfad zur Ausgabe-JSON-Datei.')
args = parser.parse_args()
try:
data = collect_run_data(args.run_id, args.worker_count)
_write_output(data, Path(args.output))
print(f"Data successfully written to {args.output}")
except InputValidationError as e:
print(f"Input validation error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Unexpected error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()