Add trace_export_script/src/trace_export_script/core.py
This commit is contained in:
parent
f17e6de7c0
commit
d90f215d65
1 changed files with 55 additions and 0 deletions
55
trace_export_script/src/trace_export_script/core.py
Normal file
55
trace_export_script/src/trace_export_script/core.py
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
import json
|
||||
import csv
|
||||
import os
|
||||
from typing import List, Dict, Any
|
||||
|
||||
|
||||
def _load_json(file_path: str) -> List[Dict[str, Any]]:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError('Unexpected data format: expected list of TraceSample objects')
|
||||
for item in data:
|
||||
if not isinstance(item, dict):
|
||||
raise ValueError('Each trace entry must be a dict')
|
||||
for key in ('timestamp', 'value', 'source'):
|
||||
if key not in item:
|
||||
raise ValueError(f'Missing field {key} in trace data')
|
||||
return data
|
||||
|
||||
|
||||
def _export_to_json(data: List[Dict[str, Any]], output_path: str) -> None:
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
|
||||
def _export_to_csv(data: List[Dict[str, Any]], output_path: str) -> None:
|
||||
if not data:
|
||||
raise ValueError('No data to export')
|
||||
with open(output_path, 'w', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=['timestamp', 'value', 'source'])
|
||||
writer.writeheader()
|
||||
writer.writerows(data)
|
||||
|
||||
|
||||
def export_trace_data(file_path: str, format: str) -> None:
|
||||
"""Exportiert Trace-Daten aus einer Eingabedatei in das gewünschte Format (JSON oder CSV)."""
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f'Input file not found: {file_path}')
|
||||
|
||||
format = format.lower()
|
||||
if format not in ('json', 'csv'):
|
||||
raise ValueError("Format must be 'json' or 'csv'")
|
||||
|
||||
data = _load_json(file_path)
|
||||
base, _ = os.path.splitext(os.path.basename(file_path))
|
||||
output_dir = os.path.join('output')
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f'{base}_exported.{format}')
|
||||
|
||||
if format == 'json':
|
||||
_export_to_json(data, output_path)
|
||||
else:
|
||||
_export_to_csv(data, output_path)
|
||||
|
||||
# Kein Rückgabewert; Datei wird als Seiteneffekt erzeugt.
|
||||
Loading…
Reference in a new issue