From 05488822ee736d386ce71503ccc895306d7e8f30 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 8 Feb 2026 03:06:31 +0000 Subject: [PATCH] Add audio_visualizer/src/audio_visualizer/io_utils.py --- .../src/audio_visualizer/io_utils.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 audio_visualizer/src/audio_visualizer/io_utils.py diff --git a/audio_visualizer/src/audio_visualizer/io_utils.py b/audio_visualizer/src/audio_visualizer/io_utils.py new file mode 100644 index 0000000..ef7df16 --- /dev/null +++ b/audio_visualizer/src/audio_visualizer/io_utils.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import csv +import logging +from pathlib import Path +from typing import NamedTuple + +import numpy as np + + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + + +class SensorDataRow(NamedTuple): + """Repräsentiert eine einzelne Zeile von Sensordaten.""" + + timestamp: float + frequency: float + amplitude: float + + +class AudioDataLoadError(Exception): + """Wird ausgelöst, wenn beim Laden der Audio- oder Sensordaten ein Fehler auftritt.""" + + + +def load_audio_data(path: str) -> np.ndarray: + """Lädt und parst Sensordaten oder Audiodateien zur Weiterverarbeitung. + + Unterstützt aktuell CSV-Dateien mit Spalten: timestamp, frequency, amplitude. + + Args: + path (str): Pfad zur Eingabedatei (CSV oder WAV). + + Returns: + np.ndarray: Ein Array mit den geladenen Daten (timestamp, frequency, amplitude). + + Raises: + AudioDataLoadError: Wenn die Datei nicht existiert oder unlesbar ist. + ValueError: Wenn Daten ungültig oder unvollständig sind. + """ + + file_path = Path(path) + if not file_path.exists(): + logger.error(f"Datei nicht gefunden: {path}") + raise AudioDataLoadError(f"Datei nicht gefunden: {path}") + + if file_path.suffix.lower() != '.csv': + logger.error(f"Nur CSV-Dateien werden derzeit unterstützt: {path}") + raise AudioDataLoadError(f"Nur CSV-Dateien werden derzeit unterstützt: {path}") + + data_rows: list[SensorDataRow] = [] + + with file_path.open('r', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + required_fields = {'timestamp', 'frequency', 'amplitude'} + + if not required_fields.issubset(reader.fieldnames or {}): + logger.error(f"CSV-Datei enthält nicht alle erforderlichen Felder: {reader.fieldnames}") + raise ValueError("Ungültiges CSV-Format: timestamp, frequency, amplitude erforderlich.") + + for row in reader: + try: + timestamp = float(row['timestamp']) + frequency = float(row['frequency']) + amplitude = float(row['amplitude']) + data_rows.append(SensorDataRow(timestamp, frequency, amplitude)) + except (KeyError, ValueError) as e: + logger.warning(f"Überspringe fehlerhafte Zeile ({row}): {e}") + + if not data_rows: + logger.error("Keine gültigen Datenzeilen gefunden.") + raise ValueError("Keine gültigen Sensordaten enthalten.") + + data_array = np.array(data_rows, dtype=float) + + logger.info(f"Geladene Datensätze: {len(data_array)} aus {path}") + + # Zusätzliche Validierung + assert data_array.shape[1] == 3, "Jede Zeile muss drei Werte enthalten: timestamp, frequency, amplitude" + + return data_array