Development of Trader Alerts System (Price, Volume, Liquidation)
An alerts system is a trader's ears on the market. Without it, you need to constantly watch the screen; with it — you can live normally and react only to significant events. A good alerts system covers three categories: price events, volume anomalies, and liquidation data.
Alert Types
Price Alerts:
- Price reached X (above/below)
- Price changed by N% for period
- Price crossed moving average
- New ATH / ATL for period
Volume Alerts:
- Candle volume exceeded N × average
- Large trade (whale) > $X in single transaction
- Sharp open interest increase
Liquidation Alerts:
- Large liquidation (> $1M per 1 minute)
- Cumulative liquidations for period
- Liquidation heatmap — prices with large expected liquidations
Architecture
class AlertRule(BaseModel):
id: str
user_id: str
type: str # 'price_above', 'price_below', 'volume_spike', 'liquidation'
symbol: str
exchange: str
# Parameters depending on type
price_threshold: Optional[Decimal]
volume_multiplier: Optional[float] # N × avg volume
liquidation_usd: Optional[float]
# Delivery
channels: list[str] # ['telegram', 'email', 'push', 'webhook']
webhook_url: Optional[str]
# Behavior
one_time: bool = True # deactivate after trigger
cooldown_minutes: int = 60 # minimum between repeat triggers
last_triggered: Optional[datetime] = None
is_active: bool = True
Alert Engine
class AlertEngine:
def __init__(self, rule_repo, notifier):
self.rules = {} # symbol → list[AlertRule]
self.rule_repo = rule_repo
self.notifier = notifier
async def on_ticker_update(self, ticker: NormalizedTicker):
rules = self.rules.get(f"{ticker.exchange}:{ticker.symbol}", [])
for rule in rules:
if not rule.is_active:
continue
if self.is_in_cooldown(rule):
continue
if await self.evaluate_rule(rule, ticker):
await self.trigger_alert(rule, ticker)
async def evaluate_rule(self, rule: AlertRule, ticker: NormalizedTicker) -> bool:
if rule.type == 'price_above':
return ticker.last >= rule.price_threshold
elif rule.type == 'price_below':
return ticker.last <= rule.price_threshold
elif rule.type == 'price_change_pct':
change = await self.compute_price_change(rule.symbol, rule.period_minutes)
return abs(change) >= rule.change_pct_threshold
return False
async def trigger_alert(self, rule: AlertRule, ticker: NormalizedTicker):
message = self.format_alert_message(rule, ticker)
# Deliver via all channels
for channel in rule.channels:
await self.notifier.send(channel, rule.user_id, message)
# Update rule state
rule.last_triggered = datetime.utcnow()
if rule.one_time:
rule.is_active = False
await self.rule_repo.save(rule)
def is_in_cooldown(self, rule: AlertRule) -> bool:
if not rule.last_triggered:
return False
elapsed = (datetime.utcnow() - rule.last_triggered).total_seconds() / 60
return elapsed < rule.cooldown_minutes
Volume Anomaly Detection
class VolumeAnomalyDetector:
WINDOW_PERIODS = 20 # candles for average calculation
async def check_volume_spike(self, symbol: str, current_volume: Decimal) -> float:
"""Returns multiplier relative to average volume"""
recent_volumes = await self.candle_repo.get_recent_volumes(
symbol, count=self.WINDOW_PERIODS
)
if len(recent_volumes) < 5:
return 1.0
avg_volume = sum(recent_volumes) / len(recent_volumes)
if avg_volume == 0:
return 1.0
return float(current_volume / avg_volume)
Liquidation Alerts
Get liquidation data from exchanges (Binance forceOrder stream, Bybit liquidation) or aggregators (Coinalyze, CoinGlass API):
class LiquidationMonitor:
async def monitor_binance_liquidations(self):
async with websockets.connect("wss://fstream.binance.com/ws/!forceOrder@arr") as ws:
async for message in ws:
data = json.loads(message)
order = data["o"]
liquidation = Liquidation(
symbol=order["s"],
side=order["S"],
quantity=Decimal(order["q"]),
price=Decimal(order["p"]),
usd_value=Decimal(order["q"]) * Decimal(order["p"]),
timestamp=data["T"],
)
await self.process_liquidation(liquidation)
async def process_liquidation(self, liq: Liquidation):
# Update rolling 1-minute total
await self.redis.incrbyfloat(
f"liq_total:{liq.symbol}:1m",
float(liq.usd_value)
)
await self.redis.expire(f"liq_total:{liq.symbol}:1m", 60)
# Check alerts for large single liquidations
if liq.usd_value >= 1_000_000: # > $1M
await self.alert_engine.fire_liquidation_alert(liq)
Webhook Delivery
class WebhookDelivery:
async def send(self, webhook_url: str, alert: AlertMessage):
payload = {
"type": alert.type,
"symbol": alert.symbol,
"message": alert.text,
"timestamp": alert.timestamp.isoformat(),
"data": alert.raw_data,
}
# Sign payload for verification on receiver side
signature = hmac.new(
alert.rule.webhook_secret.encode(),
json.dumps(payload).encode(),
hashlib.sha256
).hexdigest()
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json=payload,
headers={"X-Alert-Signature": f"sha256={signature}"},
timeout=10.0,
)
Webhook alerts allow integration with external bots, trading systems, CRM. Via webhook you can trigger automatic actions — for example, opening an order on a price alert.







