Crypto Rate Monitoring System Development
"Just ping CoinGecko API once a minute" — this approach works until the first rate limit, or until the product needs data faster than once per 60 seconds. A rate monitoring system is not one API request, it's a pipeline: data source → normalization → storage → client delivery → alerts. Each layer has its own reliability requirements.
Data Sources: What to Use and When
CEX via WebSocket (Lowest Latency)
For real-time rates — directly to exchange WebSocket streams. No aggregation, minimal latency:
// Binance WebSocket — tick data in real time
const ws = new WebSocket('wss://stream.binance.com:9443/stream?streams=btcusdt@ticker/ethusdt@ticker')
ws.on('message', (data) => {
const { stream, data: tick } = JSON.parse(data)
const symbol = tick.s // BTCUSDT
const price = parseFloat(tick.c) // last price
const volume24h = parseFloat(tick.v)
const priceChange24h = parseFloat(tick.P) // percent
normalizeAndStore({ source: 'binance', symbol, price, volume24h, timestamp: Date.now() })
})
ws.on('close', () => {
// Exponential backoff reconnect — not optional
setTimeout(connectBinance, Math.min(reconnectDelay * 2, 30000))
})
Similar streams available from Coinbase (wss://advanced-trade-ws.coinbase.com/), OKX, Bybit. For liquid pairs — at least three sources, median price as protection from outliers.
DEX On-Chain Prices
For DeFi apps prices often need to be on-chain. Two approaches:
Uniswap V3 TWAP — Time-Weighted Average Price over last N blocks. Resistant to manipulation (flash loan attacks need to hold position several blocks):
// On contract — roughly how TWAP reading works
IUniswapV3Pool pool = IUniswapV3Pool(WETH_USDC_POOL);
(int56[] memory tickCumulatives,) = pool.observe([600, 0]); // 600 seconds ago and now
int56 tickDiff = tickCumulatives[1] - tickCumulatives[0];
int24 avgTick = int24(tickDiff / 600);
uint160 sqrtPriceX96 = TickMath.getSqrtRatioAtTick(avgTick);
// Convert sqrtPriceX96 to human-readable price...
Chainlink Price Feeds — for production smart contracts this is standard. Decentralized oracle network, aggregates data from multiple sources:
AggregatorV3Interface priceFeed = AggregatorV3Interface(0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419); // ETH/USD
(, int256 price, , uint256 updatedAt,) = priceFeed.latestRoundData();
require(block.timestamp - updatedAt < 3600, "Price feed stale"); // freshness check
CoinGecko / CoinMarketCap (Aggregated)
For historical data and less liquid tokens. Rate limits: CoinGecko free — 30 requests/min, paid API — up to 500. Cache aggressively.
System Architecture
Storage: TimescaleDB or InfluxDB
Rates are typical time-series use case. PostgreSQL with TimescaleDB extension — good choice if PG already used:
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
source VARCHAR(20) NOT NULL,
price NUMERIC(20, 8) NOT NULL,
volume_24h NUMERIC(30, 2),
bid NUMERIC(20, 8),
ask NUMERIC(20, 8)
);
SELECT create_hypertable('price_ticks', 'time');
-- Automatic aggregation into OHLCV candles
SELECT
time_bucket('1 minute', time) AS bucket,
symbol,
first(price, time) AS open,
max(price) AS high,
min(price) AS low,
last(price, time) AS close,
sum(volume_24h) AS volume
FROM price_ticks
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY bucket, symbol
ORDER BY bucket DESC;
TimescaleDB automatically partitions data by time and provides time_bucket, first, last functions. Retention policy — auto-delete ticks older than 30 days, store only aggregates (1m/5m/1h/1d candles) for history.
Alerts
Two types of alerts:
Price change alert — price changed X% over Y minutes:
class PriceAlertService {
constructor(redis) {
this.redis = redis
this.subscribers = new Map()
}
async checkAlert(symbol, currentPrice) {
const priceKey = `price:${symbol}:1m_ago`
const pastPrice = await this.redis.get(priceKey)
if (pastPrice) {
const changePercent = Math.abs((currentPrice - pastPrice) / pastPrice * 100)
if (changePercent >= ALERT_THRESHOLD_PERCENT) {
await this.triggerAlert({
symbol,
currentPrice,
pastPrice: parseFloat(pastPrice),
changePercent,
direction: currentPrice > pastPrice ? 'up' : 'down'
})
}
}
// Save current price for next check
await this.redis.setex(priceKey, 60, currentPrice.toString())
}
async triggerAlert(alertData) {
// Send to Telegram, Slack, email — depends on config
for (const [webhookUrl, config] of this.subscribers) {
if (config.symbols.includes(alertData.symbol)) {
fetch(webhookUrl, { method: 'POST', body: JSON.stringify(alertData) })
}
}
}
}
Stale data alert — if data stopped updating (source down):
setInterval(() => {
for (const [symbol, lastUpdate] of lastUpdates) {
if (Date.now() - lastUpdate > STALE_THRESHOLD_MS) {
sendAlert(`Data for ${symbol} stale for ${Math.round((Date.now() - lastUpdate)/1000)}s`)
}
}
}, 10000)
Client Delivery
WebSocket API for frontend:
// Server-sent events or WebSocket — SSE simpler for read-only stream
app.get('/stream/prices', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
const symbols = req.query.symbols?.split(',') || ['BTCUSDT', 'ETHUSDT']
const interval = setInterval(async () => {
const prices = await getPricesFromCache(symbols)
res.write(`data: ${JSON.stringify(prices)}\n\n`)
}, 1000)
req.on('close', () => clearInterval(interval))
})
Redis pub/sub between data collector and API layer — to avoid straining DB on every client request.
What's Included
- Connection to data sources (exchange WebSocket, Chainlink, aggregators)
- Data normalization and TimescaleDB/InfluxDB storage
- REST and WebSocket API for clients
- Alert system with configurable conditions
- Source availability monitoring and auto-failover







