diff --git a/donau_listener/src/donau_listener/core.py b/donau_listener/src/donau_listener/core.py new file mode 100644 index 0000000..e7894e7 --- /dev/null +++ b/donau_listener/src/donau_listener/core.py @@ -0,0 +1,122 @@ +import argparse +import json +import os +from pathlib import Path +from datetime import datetime +import logging +from dataclasses import dataclass, asdict +from typing import Any + +import numpy as np +import sounddevice as sd + + +# Konfiguration des Loggers +logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s') +logger = logging.getLogger(__name__) + + +OUTPUT_PATH = Path('output/recording_log.json') + + +class DataValidationError(Exception): + """Custom Exception für ungültige Eingangsdaten.""" + pass + + +@dataclass +class RecordingData: + timestamp: str + water_level: float + ground_vibration: float + ai_label: str + + def validate(self) -> None: + if not isinstance(self.timestamp, str): + raise DataValidationError('timestamp muss vom Typ str sein.') + if not isinstance(self.water_level, float): + raise DataValidationError('water_level muss vom Typ float sein.') + if not isinstance(self.ground_vibration, float): + raise DataValidationError('ground_vibration muss vom Typ float sein.') + if not isinstance(self.ai_label, str): + raise DataValidationError('ai_label muss vom Typ str sein.') + + +def start_recording(gain: float, threshold: float) -> str: + """Startet die akustische Aufnahme mit den spezifizierten Parametern.""" + try: + if not (0.0 <= gain <= 1.0): + raise ValueError('gain muss zwischen 0 und 1 liegen.') + if threshold < 0: + raise ValueError('threshold muss positiv sein.') + + logger.info('Starte Audioaufnahme mit gain=%.2f, threshold=%.3f', gain, threshold) + duration = 2.0 # Sekunden, Demo + sample_rate = 44100 + + audio_data = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, dtype='float32') + sd.wait() + amplified = np.clip(audio_data * gain, -1.0, 1.0) + rms = float(np.sqrt(np.mean(np.square(amplified)))) + + if rms < threshold: + logger.info('Kein signifikanter Ton erkannt (RMS=%.4f).', rms) + return 'Keine Aufnahme über Schwelle erkannt.' + + logger.info('Aufnahme erfolgreich mit RMS=%.4f.', rms) + return f'Audioaufnahme abgeschlossen (RMS={rms:.4f}).' + + except Exception as e: + logger.error('Fehler während der Aufnahme: %s', e) + return f'Fehler: {e}' + + +def log_data(timestamp: str, water_level: float, ground_vibration: float, ai_label: str) -> bool: + """Speichert und strukturiert Messdaten inklusive KI-Label im JSON-Log.""" + try: + record = RecordingData(timestamp=timestamp, water_level=water_level, ground_vibration=ground_vibration, ai_label=ai_label) + record.validate() + + OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) + + if OUTPUT_PATH.exists(): + with open(OUTPUT_PATH, 'r', encoding='utf-8') as f: + existing = json.load(f) + if not isinstance(existing, list): + existing = [] + else: + existing = [] + + existing.append(asdict(record)) + with open(OUTPUT_PATH, 'w', encoding='utf-8') as f: + json.dump(existing, f, indent=2, ensure_ascii=False) + + logger.info('Messdaten erfolgreich gespeichert (%s).', OUTPUT_PATH) + return True + + except (OSError, DataValidationError, json.JSONDecodeError, TypeError) as e: + logger.error('Fehler beim Speichern der Daten: %s', e) + return False + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Donau Data Listener - Audioaufnahmemodul') + parser.add_argument('--gain', required=True, type=float, help='Verstärkungsfaktor des Mikrofonsignals (float)') + parser.add_argument('--threshold', required=True, type=float, help='Signalpegelgrenze zur Aufnahme (float)') + args = parser.parse_args() + + status = start_recording(gain=args.gain, threshold=args.threshold) + logger.info(status) + + # Beispiel-Datensatz protokollieren (Demo-Zweck) + demo_data = { + 'timestamp': datetime.utcnow().isoformat(), + 'water_level': np.random.uniform(0.2, 0.8), + 'ground_vibration': np.random.uniform(0.01, 0.05), + 'ai_label': 'ambient_noise' + } + success = log_data(**demo_data) + if success: + logger.info('Protokolleintrag erfolgreich.') + else: + logger.error('Protokollierung fehlgeschlagen.') \ No newline at end of file