From c7b4795085150eeab83b92c5764d859d32c451f1 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 5 Jul 2026 02:07:33 +0000 Subject: [PATCH] Add logger_setup/src/logger_setup/core.py --- logger_setup/src/logger_setup/core.py | 97 +++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 logger_setup/src/logger_setup/core.py diff --git a/logger_setup/src/logger_setup/core.py b/logger_setup/src/logger_setup/core.py new file mode 100644 index 0000000..f43b7e8 --- /dev/null +++ b/logger_setup/src/logger_setup/core.py @@ -0,0 +1,97 @@ +import csv +import time +import os +from datetime import datetime +from pathlib import Path +from dataclasses import dataclass, asdict +from typing import List, Dict, Any, Optional + +try: + import adafruit_dht + from adafruit_bmp280 import Adafruit_BMP280_I2C + import adafruit_tsl2591 + import board + import busio +except ImportError: + adafruit_dht = None + Adafruit_BMP280_I2C = None + adafruit_tsl2591 = None + board = None + busio = None + + +def _get_sensors(): + if board is None or adafruit_dht is None: + return None, None, None + + i2c = busio.I2C(board.SCL, board.SDA) + dht_sensor = adafruit_dht.DHT22(board.D4) + bmp_sensor = Adafruit_BMP280_I2C(i2c) + tsl_sensor = adafruit_tsl2591.TSL2591(i2c) + return dht_sensor, bmp_sensor, tsl_sensor + + +@dataclass +class SensorReading: + timestamp: datetime + temperature: float + humidity: float + pressure: float + lux: float + + def to_dict(self) -> Dict[str, Any]: + return { + 'timestamp': self.timestamp.isoformat(), + 'temperature_c': self.temperature, + 'humidity_percent': self.humidity, + 'pressure_hpa': self.pressure, + 'lux': self.lux + } + + +_readings: List[SensorReading] = [] + + +def _read_sensors() -> Optional[SensorReading]: + try: + dht_sensor, bmp_sensor, tsl_sensor = _get_sensors() + if not all([dht_sensor, bmp_sensor, tsl_sensor]): + return None + temp_c = float(dht_sensor.temperature) + humidity = float(dht_sensor.humidity) + pressure = float(bmp_sensor.pressure) + lux = float(tsl_sensor.lux) + return SensorReading( + timestamp=datetime.now(), + temperature=temp_c, + humidity=humidity, + pressure=pressure, + lux=lux + ) + except Exception: + return None + + +def start_logging(interval: float) -> None: + assert isinstance(interval, (int, float)) and interval > 0, 'Interval must be positive number.' + print(f"Starting logging with interval {interval}s. Press Ctrl+C to stop.") + while True: + reading = _read_sensors() + if reading: + _readings.append(reading) + print(f"Logged at {reading.timestamp.isoformat()} - T={reading.temperature:.2f}C H={reading.humidity:.2f}% P={reading.pressure:.2f}hPa L={reading.lux:.2f}lx") + time.sleep(interval) + + +def save_to_csv(filename: str) -> None: + assert isinstance(filename, str) and len(filename.strip()) > 0, 'Filename must be non-empty string.' + path = Path(filename) + os.makedirs(path.parent, exist_ok=True) + + with path.open('w', newline='') as csvfile: + fieldnames = ['timestamp', 'temperature_c', 'humidity_percent', 'pressure_hpa', 'lux'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for r in _readings: + writer.writerow(r.to_dict()) + print(f"Saved {_readings.__len__()} readings to {filename}") \ No newline at end of file