Add inn_listen/src/inn_listen/core.py

This commit is contained in:
Mika 2026-06-14 02:06:37 +00:00
commit 46000a3036

View file

@ -0,0 +1,112 @@
import argparse
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
import sounddevice as sd
from scipy.signal import iirnotch, lfilter
@dataclass
class AudioData:
"""Repräsentiert ein einzelnes Audioanalyseergebnis."""
timestamp: datetime
frequency: float
amplitude: float
pattern: str
class AudioProcessingError(Exception):
"""Custom-Exception für Audioverarbeitungsfehler."""
pass
def notch_filter(signal: np.ndarray, freq: float, Q: float) -> np.ndarray:
"""Wendet einen digitalen Notch-Filter an, um Störfrequenzen zu entfernen."""
if not isinstance(signal, np.ndarray):
raise TypeError("signal muss ein NumPy-Array sein")
if not isinstance(freq, (float, int)) or freq <= 0:
raise ValueError("freq muss eine positive Zahl sein")
if not isinstance(Q, (float, int)) or Q <= 0:
raise ValueError("Q muss eine positive Zahl sein")
fs = 44100.0 # Beispiel-Samplingrate
b, a = iirnotch(w0=freq / (fs / 2), Q=Q)
filtered = lfilter(b, a, signal)
return np.asarray(filtered)
def start_stream(mic_type: str, gain: float, fft_size: int) -> str:
"""Initialisiert und startet den Audio-Stream mit gegebenen Parametern."""
if mic_type not in {"contact", "directional"}:
raise ValueError("mic_type muss 'contact' oder 'directional' sein")
if not isinstance(gain, (float, int)):
raise TypeError("gain muss eine Zahl sein")
if not isinstance(fft_size, int) or fft_size <= 0:
raise ValueError("fft_size muss eine positive ganze Zahl sein")
logger = logging.getLogger("inn_listen")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
logger.info(f"Starte Audio-Stream mit Mikrofontyp={mic_type}, Gain={gain}, FFT-Größe={fft_size}")
try:
duration = 2.0 # Sekunden
sample_rate = 44100
logger.info("Erfassung der Audiodaten läuft...")
data = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, dtype='float64')
sd.wait()
data = np.squeeze(data)
window = np.hanning(fft_size)
fft_result = np.abs(np.fft.rfft(data[:fft_size] * window))
freqs = np.fft.rfftfreq(fft_size, 1 / sample_rate)
peak_freq = freqs[np.argmax(fft_result)]
amp = float(np.max(fft_result))
audio_info = AudioData(
timestamp=datetime.utcnow(),
frequency=float(peak_freq),
amplitude=amp,
pattern="ambient-resonance",
)
output_path = Path("output/analysis.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump({
"timestamp": audio_info.timestamp.isoformat(),
"frequency": audio_info.frequency,
"amplitude": audio_info.amplitude,
"pattern": audio_info.pattern,
}, f, indent=2)
logger.info(f"Analyse gespeichert unter {output_path}")
return f"Stream abgeschlossen: Peak {peak_freq:.2f} Hz bei Amplitude {amp:.2f}"
except Exception as e:
logger.error(f"Fehler im Audiostream: {e}")
raise AudioProcessingError(str(e))
def _cli_entrypoint() -> None:
parser = argparse.ArgumentParser(description="Inn Sound Listening Experiment")
parser.add_argument("--mic", required=True, choices=["contact", "directional"], help="Typ des Mikrofons")
parser.add_argument("--gain", required=True, type=float, help="Eingangsverstärkung in dB")
parser.add_argument("--fft", required=False, type=int, default=4096, help="FFT-Fenstergröße")
args = parser.parse_args()
result = start_stream(mic_type=args.mic, gain=args.gain, fft_size=args.fft)
print(result)
if __name__ == "__main__":
_cli_entrypoint()