WebSocket Market Data Aggregator Development
A WebSocket aggregator is a service that maintains persistent connections with exchanges and transmits normalized market data to downstream consumers: trading bots, dashboards, risk management systems. The key quality parameter is latency from exchange to consumer and resilience to connection breaks.
Connection Topology
Each exchange has its own WebSocket limitations:
| Exchange | Max streams / conn | Ping interval | Max connections |
|---|---|---|---|
| Binance | 1024 | 3 min | Unlimited |
| Bybit | 10 topics / conn | 20 sec | Unlimited |
| OKX | 240 channels / conn | 30 sec | Unlimited |
| Kraken | Not documented | Adaptive | Unlimited |
The aggregator creates as many connections as needed to cover all subscriptions, distributing symbols across connections according to limits.
Connection Manager
class ConnectionManager:
def __init__(self, max_per_conn: int = 900):
self.connections: list[WSConnection] = []
self.max_per_conn = max_per_conn
self.subscriptions: dict[str, WSConnection] = {}
async def subscribe(self, channels: list[str]):
for channel in channels:
conn = self._find_or_create_connection()
await conn.subscribe(channel)
self.subscriptions[channel] = conn
def _find_or_create_connection(self) -> WSConnection:
for conn in self.connections:
if conn.subscription_count < self.max_per_conn:
return conn
new_conn = WSConnection(self.on_message, self.on_disconnect)
self.connections.append(new_conn)
return new_conn
async def on_disconnect(self, conn: WSConnection):
# Exponential backoff and resubscription
await asyncio.sleep(conn.backoff.next())
await conn.reconnect()
await conn.resubscribe()
Heartbeat and Stale Connection Detection
Exchanges may go silent without explicit disconnect — TCP connection is alive, but no data. Watchdog timer for each connection:
class HeartbeatMonitor:
STALE_THRESHOLD_SEC = 30
async def watch(self, conn: WSConnection):
while True:
await asyncio.sleep(5)
age = time.time() - conn.last_message_time
if age > self.STALE_THRESHOLD_SEC:
logger.warning(f"Stale connection detected, forcing reconnect")
await conn.force_reconnect()
Data Publishing to Consumers
The aggregator publishes normalized data through several channels depending on requirements:
Redis Pub/Sub — for real-time distribution with minimal latency. Suitable when data doesn't need to be persisted.
Redis Streams — for reliable delivery with the ability to read missed messages (consumer groups, persistent log).
Kafka — for high-load systems with guaranteed delivery and partitioning by symbol.
gRPC streaming — for direct client-aggregator connections with low latency.
Performance Metrics
The aggregator should export Prometheus metrics for monitoring:
-
ws_messages_received_total{exchange, channel}— counter of incoming messages -
ws_message_latency_ms{exchange}— delay from exchange timestamp to receipt -
ws_reconnects_total{exchange}— number of reconnections -
ws_active_connections{exchange}— current number of connections -
ws_subscription_count{exchange}— number of active subscriptions
With proper implementation on Python (asyncio), the aggregator processes 50,000–100,000 messages per second on one core. On Go or Rust — an order of magnitude more.







