Add donau_listener/src/donau_listener/core.py

This commit is contained in:
Mika 2026-05-03 02:07:37 +00:00
parent 98dbb33f11
commit b667ac4cc9

View file

@ -0,0 +1,122 @@
import argparse
import json
import os
from pathlib import Path
from datetime import datetime
import logging
from dataclasses import dataclass, asdict
from typing import Any
import numpy as np
import sounddevice as sd
# Konfiguration des Loggers
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
OUTPUT_PATH = Path('output/recording_log.json')
class DataValidationError(Exception):
"""Custom Exception für ungültige Eingangsdaten."""
pass
@dataclass
class RecordingData:
timestamp: str
water_level: float
ground_vibration: float
ai_label: str
def validate(self) -> None:
if not isinstance(self.timestamp, str):
raise DataValidationError('timestamp muss vom Typ str sein.')
if not isinstance(self.water_level, float):
raise DataValidationError('water_level muss vom Typ float sein.')
if not isinstance(self.ground_vibration, float):
raise DataValidationError('ground_vibration muss vom Typ float sein.')
if not isinstance(self.ai_label, str):
raise DataValidationError('ai_label muss vom Typ str sein.')
def start_recording(gain: float, threshold: float) -> str:
"""Startet die akustische Aufnahme mit den spezifizierten Parametern."""
try:
if not (0.0 <= gain <= 1.0):
raise ValueError('gain muss zwischen 0 und 1 liegen.')
if threshold < 0:
raise ValueError('threshold muss positiv sein.')
logger.info('Starte Audioaufnahme mit gain=%.2f, threshold=%.3f', gain, threshold)
duration = 2.0 # Sekunden, Demo
sample_rate = 44100
audio_data = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, dtype='float32')
sd.wait()
amplified = np.clip(audio_data * gain, -1.0, 1.0)
rms = float(np.sqrt(np.mean(np.square(amplified))))
if rms < threshold:
logger.info('Kein signifikanter Ton erkannt (RMS=%.4f).', rms)
return 'Keine Aufnahme über Schwelle erkannt.'
logger.info('Aufnahme erfolgreich mit RMS=%.4f.', rms)
return f'Audioaufnahme abgeschlossen (RMS={rms:.4f}).'
except Exception as e:
logger.error('Fehler während der Aufnahme: %s', e)
return f'Fehler: {e}'
def log_data(timestamp: str, water_level: float, ground_vibration: float, ai_label: str) -> bool:
"""Speichert und strukturiert Messdaten inklusive KI-Label im JSON-Log."""
try:
record = RecordingData(timestamp=timestamp, water_level=water_level, ground_vibration=ground_vibration, ai_label=ai_label)
record.validate()
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
if OUTPUT_PATH.exists():
with open(OUTPUT_PATH, 'r', encoding='utf-8') as f:
existing = json.load(f)
if not isinstance(existing, list):
existing = []
else:
existing = []
existing.append(asdict(record))
with open(OUTPUT_PATH, 'w', encoding='utf-8') as f:
json.dump(existing, f, indent=2, ensure_ascii=False)
logger.info('Messdaten erfolgreich gespeichert (%s).', OUTPUT_PATH)
return True
except (OSError, DataValidationError, json.JSONDecodeError, TypeError) as e:
logger.error('Fehler beim Speichern der Daten: %s', e)
return False
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Donau Data Listener - Audioaufnahmemodul')
parser.add_argument('--gain', required=True, type=float, help='Verstärkungsfaktor des Mikrofonsignals (float)')
parser.add_argument('--threshold', required=True, type=float, help='Signalpegelgrenze zur Aufnahme (float)')
args = parser.parse_args()
status = start_recording(gain=args.gain, threshold=args.threshold)
logger.info(status)
# Beispiel-Datensatz protokollieren (Demo-Zweck)
demo_data = {
'timestamp': datetime.utcnow().isoformat(),
'water_level': np.random.uniform(0.2, 0.8),
'ground_vibration': np.random.uniform(0.01, 0.05),
'ai_label': 'ambient_noise'
}
success = log_data(**demo_data)
if success:
logger.info('Protokolleintrag erfolgreich.')
else:
logger.error('Protokollierung fehlgeschlagen.')