aitrader/src/aitrader/exchange/market_data.py

57 lines
1.8 KiB
Python

"""Sammelt Marktdaten (OHLCV mehrerer Timeframes + Orderbook + Ticker)."""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any
import pandas as pd
from ..logging_setup import get_logger
from .kraken import KrakenClient
log = get_logger(__name__)
_RETRY_DELAYS = (5, 15) # Sekunden zwischen Retry-Versuchen bei 503
@dataclass
class MarketSnapshot:
symbol: str
ticker: dict[str, Any]
ohlcv: dict[str, pd.DataFrame] = field(default_factory=dict) # timeframe -> df
orderbook: dict[str, Any] = field(default_factory=dict)
def _with_retry(fn, *args, **kwargs):
"""Führt fn aus, wiederholt bei ServiceUnavailable/NetworkError bis zu 2x."""
import ccxt
last_exc: Exception | None = None
for attempt, delay in enumerate(("first", *_RETRY_DELAYS)):
try:
return fn(*args, **kwargs)
except (ccxt.NetworkError, ccxt.RequestTimeout) as exc:
last_exc = exc
if delay == "first":
continue
log.warning("market_data.retry", attempt=attempt, delay=delay, error=str(exc))
time.sleep(delay)
raise last_exc # type: ignore[misc]
def collect_snapshot(
client: KrakenClient,
symbol: str,
timeframes: list[str],
ohlcv_limit: int,
orderbook_depth: int = 10,
) -> MarketSnapshot:
snap = MarketSnapshot(symbol=symbol, ticker=_with_retry(client.fetch_ticker, symbol))
for tf in timeframes:
snap.ohlcv[tf] = _with_retry(client.fetch_ohlcv, symbol, tf, limit=ohlcv_limit)
try:
snap.orderbook = _with_retry(client.fetch_orderbook, symbol, depth=orderbook_depth)
except Exception as e:
log.warning("orderbook.unavailable", symbol=symbol, error=str(e))
return snap