Add artifact.data_logger/src/artifact_data_logger/core.py

This commit is contained in:
Mika 2026-04-03 10:57:05 +00:00
parent 4d8747f443
commit ac5ede1aef

View file

@ -0,0 +1,87 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from rich.console import Console
class MetricsValidationError(Exception):
"""Raised when the provided metrics data is invalid."""
@dataclass
class Metrics:
setup_fingerprint: str
policy_hash: str
epoch_ms: int
near_expiry_unpinned: float
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Metrics:
required_fields = {
'setup_fingerprint': str,
'policy_hash': str,
'epoch_ms': int,
'near_expiry_unpinned': (float, int),
}
for field_name, expected_type in required_fields.items():
if field_name not in data:
raise MetricsValidationError(f"Missing required field: {field_name}")
if not isinstance(data[field_name], expected_type):
raise MetricsValidationError(
f"Invalid type for {field_name}: expected {expected_type}, got {type(data[field_name])}"
)
return cls(
setup_fingerprint=str(data['setup_fingerprint']),
policy_hash=str(data['policy_hash']),
epoch_ms=int(data['epoch_ms']),
near_expiry_unpinned=float(data['near_expiry_unpinned']),
)
def to_log_record(self, run_id: str) -> Dict[str, Any]:
ts = datetime.utcfromtimestamp(self.epoch_ms / 1000.0).isoformat() + 'Z'
return {
'run_id': run_id,
'timestamp': ts,
'setup_fingerprint': self.setup_fingerprint,
'policy_hash': self.policy_hash,
'near_expiry_unpinned': self.near_expiry_unpinned,
}
def log_metrics(run_id: str, metrics: Dict[str, Any]) -> None:
"""Logs experiment metrics to stdout in structured format.
Args:
run_id: Unique identifier of the experiment run.
metrics: Dictionary containing setup_fingerprint, policy_hash, epoch_ms, and near_expiry_unpinned.
"""
assert isinstance(run_id, str) and run_id.strip(), "run_id must be a non-empty string"
assert isinstance(metrics, dict), "metrics must be a dict"
console = Console()
try:
metrics_obj = Metrics.from_dict(metrics)
except MetricsValidationError as err:
console.print(f"[bold red]Metrics validation failed:[/bold red] {err}")
raise
record = metrics_obj.to_log_record(run_id)
timestamp = record['timestamp']
message = (
f"[{timestamp}] Run {run_id}: "
f"setup_fingerprint={record['setup_fingerprint']} | "
f"policy_hash={record['policy_hash']} | "
f"near_expiry_unpinned={record['near_expiry_unpinned']:.4f}"
)
console.print(f"[green]Log:[/green] {message}")
# Optional: write to a local log file for CI artifact consistency
log_path = Path(f"metrics_log_{run_id}.txt")
with log_path.open('a', encoding='utf-8') as f:
f.write(message + '\n')
# ensure log content integrity in CI environment
assert log_path.exists(), "Log file should be created successfully."