WebSocket market data aggregator development

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
WebSocket market data aggregator development
Medium
~5 business days
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1170
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1092
  • image_logo-advance_0.png
    B2B Advance company logo design
    563
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    830
  • image_logo-aider_0.jpg
    AIDER company logo development
    763
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    878

WebSocket Market Data Aggregator Development

A WebSocket aggregator is a service that maintains persistent connections with exchanges and transmits normalized market data to downstream consumers: trading bots, dashboards, risk management systems. The key quality parameter is latency from exchange to consumer and resilience to connection breaks.

Connection Topology

Each exchange has its own WebSocket limitations:

Exchange Max streams / conn Ping interval Max connections
Binance 1024 3 min Unlimited
Bybit 10 topics / conn 20 sec Unlimited
OKX 240 channels / conn 30 sec Unlimited
Kraken Not documented Adaptive Unlimited

The aggregator creates as many connections as needed to cover all subscriptions, distributing symbols across connections according to limits.

Connection Manager

class ConnectionManager:
    def __init__(self, max_per_conn: int = 900):
        self.connections: list[WSConnection] = []
        self.max_per_conn = max_per_conn
        self.subscriptions: dict[str, WSConnection] = {}

    async def subscribe(self, channels: list[str]):
        for channel in channels:
            conn = self._find_or_create_connection()
            await conn.subscribe(channel)
            self.subscriptions[channel] = conn

    def _find_or_create_connection(self) -> WSConnection:
        for conn in self.connections:
            if conn.subscription_count < self.max_per_conn:
                return conn
        new_conn = WSConnection(self.on_message, self.on_disconnect)
        self.connections.append(new_conn)
        return new_conn

    async def on_disconnect(self, conn: WSConnection):
        # Exponential backoff and resubscription
        await asyncio.sleep(conn.backoff.next())
        await conn.reconnect()
        await conn.resubscribe()

Heartbeat and Stale Connection Detection

Exchanges may go silent without explicit disconnect — TCP connection is alive, but no data. Watchdog timer for each connection:

class HeartbeatMonitor:
    STALE_THRESHOLD_SEC = 30

    async def watch(self, conn: WSConnection):
        while True:
            await asyncio.sleep(5)
            age = time.time() - conn.last_message_time
            if age > self.STALE_THRESHOLD_SEC:
                logger.warning(f"Stale connection detected, forcing reconnect")
                await conn.force_reconnect()

Data Publishing to Consumers

The aggregator publishes normalized data through several channels depending on requirements:

Redis Pub/Sub — for real-time distribution with minimal latency. Suitable when data doesn't need to be persisted.

Redis Streams — for reliable delivery with the ability to read missed messages (consumer groups, persistent log).

Kafka — for high-load systems with guaranteed delivery and partitioning by symbol.

gRPC streaming — for direct client-aggregator connections with low latency.

Performance Metrics

The aggregator should export Prometheus metrics for monitoring:

  • ws_messages_received_total{exchange, channel} — counter of incoming messages
  • ws_message_latency_ms{exchange} — delay from exchange timestamp to receipt
  • ws_reconnects_total{exchange} — number of reconnections
  • ws_active_connections{exchange} — current number of connections
  • ws_subscription_count{exchange} — number of active subscriptions

With proper implementation on Python (asyncio), the aggregator processes 50,000–100,000 messages per second on one core. On Go or Rust — an order of magnitude more.