Add logger_setup/src/logger_setup/core.py

This commit is contained in:
Mika 2026-07-05 02:07:33 +00:00
parent da919f69c4
commit c7b4795085

View file

@ -0,0 +1,97 @@
import csv
import time
import os
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional
try:
import adafruit_dht
from adafruit_bmp280 import Adafruit_BMP280_I2C
import adafruit_tsl2591
import board
import busio
except ImportError:
adafruit_dht = None
Adafruit_BMP280_I2C = None
adafruit_tsl2591 = None
board = None
busio = None
def _get_sensors():
if board is None or adafruit_dht is None:
return None, None, None
i2c = busio.I2C(board.SCL, board.SDA)
dht_sensor = adafruit_dht.DHT22(board.D4)
bmp_sensor = Adafruit_BMP280_I2C(i2c)
tsl_sensor = adafruit_tsl2591.TSL2591(i2c)
return dht_sensor, bmp_sensor, tsl_sensor
@dataclass
class SensorReading:
timestamp: datetime
temperature: float
humidity: float
pressure: float
lux: float
def to_dict(self) -> Dict[str, Any]:
return {
'timestamp': self.timestamp.isoformat(),
'temperature_c': self.temperature,
'humidity_percent': self.humidity,
'pressure_hpa': self.pressure,
'lux': self.lux
}
_readings: List[SensorReading] = []
def _read_sensors() -> Optional[SensorReading]:
try:
dht_sensor, bmp_sensor, tsl_sensor = _get_sensors()
if not all([dht_sensor, bmp_sensor, tsl_sensor]):
return None
temp_c = float(dht_sensor.temperature)
humidity = float(dht_sensor.humidity)
pressure = float(bmp_sensor.pressure)
lux = float(tsl_sensor.lux)
return SensorReading(
timestamp=datetime.now(),
temperature=temp_c,
humidity=humidity,
pressure=pressure,
lux=lux
)
except Exception:
return None
def start_logging(interval: float) -> None:
assert isinstance(interval, (int, float)) and interval > 0, 'Interval must be positive number.'
print(f"Starting logging with interval {interval}s. Press Ctrl+C to stop.")
while True:
reading = _read_sensors()
if reading:
_readings.append(reading)
print(f"Logged at {reading.timestamp.isoformat()} - T={reading.temperature:.2f}C H={reading.humidity:.2f}% P={reading.pressure:.2f}hPa L={reading.lux:.2f}lx")
time.sleep(interval)
def save_to_csv(filename: str) -> None:
assert isinstance(filename, str) and len(filename.strip()) > 0, 'Filename must be non-empty string.'
path = Path(filename)
os.makedirs(path.parent, exist_ok=True)
with path.open('w', newline='') as csvfile:
fieldnames = ['timestamp', 'temperature_c', 'humidity_percent', 'pressure_hpa', 'lux']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for r in _readings:
writer.writerow(r.to_dict())
print(f"Saved {_readings.__len__()} readings to {filename}")