From d9f5da576220fa21b8dc37542267cac0b95e009f Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 8 Mar 2026 03:07:04 +0000 Subject: [PATCH] Add data_logging/src/data_logging/core.py --- data_logging/src/data_logging/core.py | 75 +++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 data_logging/src/data_logging/core.py diff --git a/data_logging/src/data_logging/core.py b/data_logging/src/data_logging/core.py new file mode 100644 index 0000000..0594538 --- /dev/null +++ b/data_logging/src/data_logging/core.py @@ -0,0 +1,75 @@ +from __future__ import annotations +import csv +from dataclasses import dataclass, asdict +from datetime import datetime +from pathlib import Path +from typing import Any, Dict +import os +import logging + + +# Setup basic logging for internal operations +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') + + +class LogFileError(Exception): + """Custom exception raised when logfile writing fails.""" + pass + + +@dataclass +class LogEntry: + """Represents a single sensor data point.""" + time: str + lux: float + dB: float + temperature: float + inference_score: float + + def __post_init__(self) -> None: + if not isinstance(self.time, str): + raise TypeError("time must be a string") + for field_name, value in [("lux", self.lux), ("dB", self.dB), ("temperature", self.temperature), ("inference_score", self.inference_score)]: + if not isinstance(value, (float, int)): + raise TypeError(f"{field_name} must be a number") + # Convert numeric values to float explicitly + self.lux = float(self.lux) + self.dB = float(self.dB) + self.temperature = float(self.temperature) + self.inference_score = float(self.inference_score) + + def to_dict(self) -> Dict[str, Any]: + """Convert instance data to a dictionary for CSV/JSON writing.""" + return asdict(self) + + +def log_data(entry: LogEntry, output_path: str = 'output/sensor_data_log.csv') -> None: + """Appends a LogEntry to the configured log file.""" + if not isinstance(entry, LogEntry): + raise TypeError("entry must be an instance of LogEntry") + if not isinstance(output_path, str): + raise TypeError("output_path must be a string") + + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + row = entry.to_dict() + file_exists = output_file.exists() + try: + with open(output_file, mode='a', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=row.keys()) + if not file_exists: + writer.writeheader() + writer.writerow(row) + logging.info(f"Logged data to {output_file}: {row}") + except (OSError, csv.Error) as e: + logging.error(f"Failed to write to log file {output_file}: {e}") + raise LogFileError(f"Error writing to log file {output_file}: {e}") + + +# Self-test safeguard for CI integrity +if __name__ == '__main__': + now = datetime.now().strftime('%H:%M:%S') + test_entry = LogEntry(time=now, lux=120.5, dB=45.2, temperature=23.7, inference_score=0.87) + log_data(test_entry) + logging.info("Sample log entry written successfully.")