From c98bd3bddab78cb9d172f4ec4b10bc7d5fecb0e3 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 22 Feb 2026 03:07:04 +0000 Subject: [PATCH] Add data_analysis/src/data_analysis/core.py --- data_analysis/src/data_analysis/core.py | 102 ++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 data_analysis/src/data_analysis/core.py diff --git a/data_analysis/src/data_analysis/core.py b/data_analysis/src/data_analysis/core.py new file mode 100644 index 0000000..fe934f2 --- /dev/null +++ b/data_analysis/src/data_analysis/core.py @@ -0,0 +1,102 @@ +import json +import argparse +import logging +import numpy as np +import pandas as pd +from pathlib import Path +from typing import List, Dict, Any + + +class DataValidationError(Exception): + """Custom exception for invalid input data.""" + pass + + +def _validate_data_points(data: List[Dict[str, Any]], required_fields=("intensity", "background_noise")) -> None: + if not isinstance(data, list): + raise DataValidationError("Input data must be a list of dictionaries.") + for i, item in enumerate(data): + if not isinstance(item, dict): + raise DataValidationError(f"Data item at index {i} is not a dictionary.") + for field in required_fields: + if field not in item: + raise DataValidationError(f"Missing field '{field}' in data item at index {i}.") + if not isinstance(item[field], (int, float)): + raise DataValidationError(f"Field '{field}' in item {i} must be numeric.") + + +def correct_flourescence(raw_data: List[Dict[str, float]]) -> List[Dict[str, float]]: + """Korrigiert Roh-Fluoreszenzdaten durch Glättung und Basislinienabzug.""" + _validate_data_points(raw_data) + df = pd.DataFrame(raw_data) + # Glättung (moving average filter) + df['intensity'] = df['intensity'].rolling(window=3, min_periods=1, center=True).mean() + # Basislinienabzug anhand des Medianwerts + baseline = df['background_noise'].median() + df['corrected_intensity'] = np.maximum(df['intensity'] - baseline, 0) + corrected = [ + { + 'intensity': float(row['corrected_intensity']), + 'background_noise': float(row['background_noise']) + } + for _, row in df.iterrows() + ] + return corrected + + +def substract_reference(data: List[Dict[str, float]], reference: List[Dict[str, float]]) -> List[Dict[str, float]]: + """Subtrahiert Referenzwerte (z. B. Fremdlicht) aus den Daten.""" + _validate_data_points(data) + _validate_data_points(reference) + df_data = pd.DataFrame(data) + df_ref = pd.DataFrame(reference) + # Falls Längen unterschiedlich, mit Minimalgröße arbeiten + min_len = min(len(df_data), len(df_ref)) + df_data = df_data.iloc[:min_len].reset_index(drop=True) + df_ref = df_ref.iloc[:min_len].reset_index(drop=True) + df_result = df_data.copy() + df_result['intensity'] = np.maximum(df_data['intensity'] - df_ref['intensity'], 0) + return df_result.to_dict(orient='records') + + +def _load_json(path: Path) -> List[Dict[str, Any]]: + with path.open('r', encoding='utf-8') as f: + return json.load(f) + + +def _save_json(data: List[Dict[str, Any]], path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open('w', encoding='utf-8') as f: + json.dump(data, f, indent=2) + + +def main(): + parser = argparse.ArgumentParser(description="Fluoreszenzdaten-Korrektur") + parser.add_argument('--input', required=True, help='Pfad zur Eingabedatei mit Rohdaten (JSON).') + parser.add_argument('--reference', required=True, help='Pfad zur Referenzdatei (JSON).') + parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei (JSON).') + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + logger = logging.getLogger(__name__) + + in_path = Path(args.input) + ref_path = Path(args.reference) + out_path = Path(args.output) + + logger.info(f"Lade Rohdaten aus {in_path}...") + raw_data = _load_json(in_path) + logger.info(f"Lade Referenzdaten aus {ref_path}...") + reference_data = _load_json(ref_path) + + logger.info("Korrigiere Fluoreszenzdaten...") + corrected = correct_flourescence(raw_data) + logger.info("Subtrahiere Referenzsignal...") + result = substract_reference(corrected, reference_data) + + _save_json(result, out_path) + logger.info(f"Korrigierte Daten wurden nach {out_path} geschrieben.") + + +if __name__ == '__main__': + main()