diff --git a/drift_data_collection/src/drift_data_collection/core.py b/drift_data_collection/src/drift_data_collection/core.py new file mode 100644 index 0000000..2a047e6 --- /dev/null +++ b/drift_data_collection/src/drift_data_collection/core.py @@ -0,0 +1,105 @@ +import json +import logging +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +import requests + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class DataCollectionError(Exception): + """Custom exception for errors during data collection.""" + + +class FrozenRun: + """Represents frozen CI run data for drift analysis.""" + + def __init__(self, run_id: str, status: str, timestamp: datetime, pinned_state: bool, metrics: str) -> None: + self.run_id = run_id + self.status = status + self.timestamp = timestamp + self.pinned_state = pinned_state + self.metrics = metrics + + def to_dict(self) -> Dict[str, Any]: + return { + "run_id": self.run_id, + "status": self.status, + "timestamp": self.timestamp.isoformat(), + "pinned_state": self.pinned_state, + "metrics": self.metrics, + } + + +def _validate_run_ids(run_ids: List[str]) -> None: + if not isinstance(run_ids, list): + raise ValueError("run_ids must be a list of strings.") + for rid in run_ids: + if not isinstance(rid, str) or not rid.strip(): + raise ValueError(f"Invalid run_id: {rid!r}") + + +def _fetch_run_data(run_id: str) -> Optional[FrozenRun]: + """Fetch run data; fallback to simulated local info if retrieval fails.""" + try: + # Placeholder endpoint, could be replaced with local file lookup or API + response = requests.get(f"https://ci.example.com/api/runs/{run_id}", timeout=5) + if response.status_code != 200: + logger.warning("Run %s returned HTTP %s", run_id, response.status_code) + return None + data = response.json() + ts_str = data.get("timestamp") + try: + timestamp = datetime.fromisoformat(ts_str) if ts_str else datetime.utcnow() + except ValueError: + timestamp = datetime.utcnow() + return FrozenRun( + run_id=data.get("run_id", run_id), + status=data.get("status", "UNKNOWN"), + timestamp=timestamp, + pinned_state=bool(data.get("pinned_state", False)), + metrics=json.dumps(data.get("metrics", {})), + ) + except requests.RequestException as e: + logger.error("Network error collecting run %s: %s", run_id, e) + return None + except Exception as e: + logger.exception("Unexpected error fetching run %s: %s", run_id, e) + return None + + +def collect_frozen_runs(run_ids: List[str]) -> List[Dict[str, Any]]: + """Collect and aggregate frozen run data for given run IDs. + + Args: + run_ids: List of run IDs to collect. + + Returns: + List of dictionaries containing frozen run data. + """ + _validate_run_ids(run_ids) + + collected: List[Dict[str, Any]] = [] + for rid in run_ids: + run = _fetch_run_data(rid) + if run: + collected.append(run.to_dict()) + else: + # Build default entry if missing + logger.info("Using fallback data for run %s", rid) + collected.append( + FrozenRun( + run_id=rid, + status="MISSING", + timestamp=datetime.utcnow(), + pinned_state=False, + metrics="{}", + ).to_dict() + ) + + assert all("run_id" in r for r in collected), "Integrity check failed: missing run_id" + return collected