diff --git a/artifact.data_logger/src/artifact_data_logger/core.py b/artifact.data_logger/src/artifact_data_logger/core.py new file mode 100644 index 0000000..4171d21 --- /dev/null +++ b/artifact.data_logger/src/artifact_data_logger/core.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Dict +from rich.console import Console + + +class MetricsValidationError(Exception): + """Raised when the provided metrics data is invalid.""" + + +@dataclass +class Metrics: + setup_fingerprint: str + policy_hash: str + epoch_ms: int + near_expiry_unpinned: float + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> Metrics: + required_fields = { + 'setup_fingerprint': str, + 'policy_hash': str, + 'epoch_ms': int, + 'near_expiry_unpinned': (float, int), + } + for field_name, expected_type in required_fields.items(): + if field_name not in data: + raise MetricsValidationError(f"Missing required field: {field_name}") + if not isinstance(data[field_name], expected_type): + raise MetricsValidationError( + f"Invalid type for {field_name}: expected {expected_type}, got {type(data[field_name])}" + ) + return cls( + setup_fingerprint=str(data['setup_fingerprint']), + policy_hash=str(data['policy_hash']), + epoch_ms=int(data['epoch_ms']), + near_expiry_unpinned=float(data['near_expiry_unpinned']), + ) + + def to_log_record(self, run_id: str) -> Dict[str, Any]: + ts = datetime.utcfromtimestamp(self.epoch_ms / 1000.0).isoformat() + 'Z' + return { + 'run_id': run_id, + 'timestamp': ts, + 'setup_fingerprint': self.setup_fingerprint, + 'policy_hash': self.policy_hash, + 'near_expiry_unpinned': self.near_expiry_unpinned, + } + +def log_metrics(run_id: str, metrics: Dict[str, Any]) -> None: + """Logs experiment metrics to stdout in structured format. + + Args: + run_id: Unique identifier of the experiment run. + metrics: Dictionary containing setup_fingerprint, policy_hash, epoch_ms, and near_expiry_unpinned. + """ + assert isinstance(run_id, str) and run_id.strip(), "run_id must be a non-empty string" + assert isinstance(metrics, dict), "metrics must be a dict" + + console = Console() + try: + metrics_obj = Metrics.from_dict(metrics) + except MetricsValidationError as err: + console.print(f"[bold red]Metrics validation failed:[/bold red] {err}") + raise + + record = metrics_obj.to_log_record(run_id) + timestamp = record['timestamp'] + message = ( + f"[{timestamp}] Run {run_id}: " + f"setup_fingerprint={record['setup_fingerprint']} | " + f"policy_hash={record['policy_hash']} | " + f"near_expiry_unpinned={record['near_expiry_unpinned']:.4f}" + ) + console.print(f"[green]Log:[/green] {message}") + + # Optional: write to a local log file for CI artifact consistency + log_path = Path(f"metrics_log_{run_id}.txt") + with log_path.open('a', encoding='utf-8') as f: + f.write(message + '\n') + + # ensure log content integrity in CI environment + assert log_path.exists(), "Log file should be created successfully."