Add drift_data_collection/src/drift_data_collection/core.py
This commit is contained in:
parent
e06b7cba46
commit
585d109adc
1 changed files with 105 additions and 0 deletions
105
drift_data_collection/src/drift_data_collection/core.py
Normal file
105
drift_data_collection/src/drift_data_collection/core.py
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
class DataCollectionError(Exception):
|
||||
"""Custom exception for errors during data collection."""
|
||||
|
||||
|
||||
class FrozenRun:
|
||||
"""Represents frozen CI run data for drift analysis."""
|
||||
|
||||
def __init__(self, run_id: str, status: str, timestamp: datetime, pinned_state: bool, metrics: str) -> None:
|
||||
self.run_id = run_id
|
||||
self.status = status
|
||||
self.timestamp = timestamp
|
||||
self.pinned_state = pinned_state
|
||||
self.metrics = metrics
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"run_id": self.run_id,
|
||||
"status": self.status,
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"pinned_state": self.pinned_state,
|
||||
"metrics": self.metrics,
|
||||
}
|
||||
|
||||
|
||||
def _validate_run_ids(run_ids: List[str]) -> None:
|
||||
if not isinstance(run_ids, list):
|
||||
raise ValueError("run_ids must be a list of strings.")
|
||||
for rid in run_ids:
|
||||
if not isinstance(rid, str) or not rid.strip():
|
||||
raise ValueError(f"Invalid run_id: {rid!r}")
|
||||
|
||||
|
||||
def _fetch_run_data(run_id: str) -> Optional[FrozenRun]:
|
||||
"""Fetch run data; fallback to simulated local info if retrieval fails."""
|
||||
try:
|
||||
# Placeholder endpoint, could be replaced with local file lookup or API
|
||||
response = requests.get(f"https://ci.example.com/api/runs/{run_id}", timeout=5)
|
||||
if response.status_code != 200:
|
||||
logger.warning("Run %s returned HTTP %s", run_id, response.status_code)
|
||||
return None
|
||||
data = response.json()
|
||||
ts_str = data.get("timestamp")
|
||||
try:
|
||||
timestamp = datetime.fromisoformat(ts_str) if ts_str else datetime.utcnow()
|
||||
except ValueError:
|
||||
timestamp = datetime.utcnow()
|
||||
return FrozenRun(
|
||||
run_id=data.get("run_id", run_id),
|
||||
status=data.get("status", "UNKNOWN"),
|
||||
timestamp=timestamp,
|
||||
pinned_state=bool(data.get("pinned_state", False)),
|
||||
metrics=json.dumps(data.get("metrics", {})),
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
logger.error("Network error collecting run %s: %s", run_id, e)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.exception("Unexpected error fetching run %s: %s", run_id, e)
|
||||
return None
|
||||
|
||||
|
||||
def collect_frozen_runs(run_ids: List[str]) -> List[Dict[str, Any]]:
|
||||
"""Collect and aggregate frozen run data for given run IDs.
|
||||
|
||||
Args:
|
||||
run_ids: List of run IDs to collect.
|
||||
|
||||
Returns:
|
||||
List of dictionaries containing frozen run data.
|
||||
"""
|
||||
_validate_run_ids(run_ids)
|
||||
|
||||
collected: List[Dict[str, Any]] = []
|
||||
for rid in run_ids:
|
||||
run = _fetch_run_data(rid)
|
||||
if run:
|
||||
collected.append(run.to_dict())
|
||||
else:
|
||||
# Build default entry if missing
|
||||
logger.info("Using fallback data for run %s", rid)
|
||||
collected.append(
|
||||
FrozenRun(
|
||||
run_id=rid,
|
||||
status="MISSING",
|
||||
timestamp=datetime.utcnow(),
|
||||
pinned_state=False,
|
||||
metrics="{}",
|
||||
).to_dict()
|
||||
)
|
||||
|
||||
assert all("run_id" in r for r in collected), "Integrity check failed: missing run_id"
|
||||
return collected
|
||||
Loading…
Reference in a new issue