Trader alerts system (price, volume, liquidation)

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
Trader alerts system (price, volume, liquidation)
Medium
~3-5 business days
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1170
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1092
  • image_logo-advance_0.png
    B2B Advance company logo design
    563
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    830
  • image_logo-aider_0.jpg
    AIDER company logo development
    763
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    878

Development of Trader Alerts System (Price, Volume, Liquidation)

An alerts system is a trader's ears on the market. Without it, you need to constantly watch the screen; with it — you can live normally and react only to significant events. A good alerts system covers three categories: price events, volume anomalies, and liquidation data.

Alert Types

Price Alerts:

  • Price reached X (above/below)
  • Price changed by N% for period
  • Price crossed moving average
  • New ATH / ATL for period

Volume Alerts:

  • Candle volume exceeded N × average
  • Large trade (whale) > $X in single transaction
  • Sharp open interest increase

Liquidation Alerts:

  • Large liquidation (> $1M per 1 minute)
  • Cumulative liquidations for period
  • Liquidation heatmap — prices with large expected liquidations

Architecture

class AlertRule(BaseModel):
    id: str
    user_id: str
    type: str           # 'price_above', 'price_below', 'volume_spike', 'liquidation'
    symbol: str
    exchange: str
    
    # Parameters depending on type
    price_threshold: Optional[Decimal]
    volume_multiplier: Optional[float]  # N × avg volume
    liquidation_usd: Optional[float]
    
    # Delivery
    channels: list[str]  # ['telegram', 'email', 'push', 'webhook']
    webhook_url: Optional[str]
    
    # Behavior
    one_time: bool = True     # deactivate after trigger
    cooldown_minutes: int = 60  # minimum between repeat triggers
    
    last_triggered: Optional[datetime] = None
    is_active: bool = True

Alert Engine

class AlertEngine:
    def __init__(self, rule_repo, notifier):
        self.rules = {}  # symbol → list[AlertRule]
        self.rule_repo = rule_repo
        self.notifier = notifier

    async def on_ticker_update(self, ticker: NormalizedTicker):
        rules = self.rules.get(f"{ticker.exchange}:{ticker.symbol}", [])

        for rule in rules:
            if not rule.is_active:
                continue
            if self.is_in_cooldown(rule):
                continue

            if await self.evaluate_rule(rule, ticker):
                await self.trigger_alert(rule, ticker)

    async def evaluate_rule(self, rule: AlertRule, ticker: NormalizedTicker) -> bool:
        if rule.type == 'price_above':
            return ticker.last >= rule.price_threshold
        elif rule.type == 'price_below':
            return ticker.last <= rule.price_threshold
        elif rule.type == 'price_change_pct':
            change = await self.compute_price_change(rule.symbol, rule.period_minutes)
            return abs(change) >= rule.change_pct_threshold
        return False

    async def trigger_alert(self, rule: AlertRule, ticker: NormalizedTicker):
        message = self.format_alert_message(rule, ticker)

        # Deliver via all channels
        for channel in rule.channels:
            await self.notifier.send(channel, rule.user_id, message)

        # Update rule state
        rule.last_triggered = datetime.utcnow()
        if rule.one_time:
            rule.is_active = False

        await self.rule_repo.save(rule)

    def is_in_cooldown(self, rule: AlertRule) -> bool:
        if not rule.last_triggered:
            return False
        elapsed = (datetime.utcnow() - rule.last_triggered).total_seconds() / 60
        return elapsed < rule.cooldown_minutes

Volume Anomaly Detection

class VolumeAnomalyDetector:
    WINDOW_PERIODS = 20  # candles for average calculation

    async def check_volume_spike(self, symbol: str, current_volume: Decimal) -> float:
        """Returns multiplier relative to average volume"""
        recent_volumes = await self.candle_repo.get_recent_volumes(
            symbol, count=self.WINDOW_PERIODS
        )

        if len(recent_volumes) < 5:
            return 1.0

        avg_volume = sum(recent_volumes) / len(recent_volumes)
        if avg_volume == 0:
            return 1.0

        return float(current_volume / avg_volume)

Liquidation Alerts

Get liquidation data from exchanges (Binance forceOrder stream, Bybit liquidation) or aggregators (Coinalyze, CoinGlass API):

class LiquidationMonitor:
    async def monitor_binance_liquidations(self):
        async with websockets.connect("wss://fstream.binance.com/ws/!forceOrder@arr") as ws:
            async for message in ws:
                data = json.loads(message)
                order = data["o"]

                liquidation = Liquidation(
                    symbol=order["s"],
                    side=order["S"],
                    quantity=Decimal(order["q"]),
                    price=Decimal(order["p"]),
                    usd_value=Decimal(order["q"]) * Decimal(order["p"]),
                    timestamp=data["T"],
                )

                await self.process_liquidation(liquidation)

    async def process_liquidation(self, liq: Liquidation):
        # Update rolling 1-minute total
        await self.redis.incrbyfloat(
            f"liq_total:{liq.symbol}:1m",
            float(liq.usd_value)
        )
        await self.redis.expire(f"liq_total:{liq.symbol}:1m", 60)

        # Check alerts for large single liquidations
        if liq.usd_value >= 1_000_000:  # > $1M
            await self.alert_engine.fire_liquidation_alert(liq)

Webhook Delivery

class WebhookDelivery:
    async def send(self, webhook_url: str, alert: AlertMessage):
        payload = {
            "type": alert.type,
            "symbol": alert.symbol,
            "message": alert.text,
            "timestamp": alert.timestamp.isoformat(),
            "data": alert.raw_data,
        }

        # Sign payload for verification on receiver side
        signature = hmac.new(
            alert.rule.webhook_secret.encode(),
            json.dumps(payload).encode(),
            hashlib.sha256
        ).hexdigest()

        async with httpx.AsyncClient() as client:
            await client.post(
                webhook_url,
                json=payload,
                headers={"X-Alert-Signature": f"sha256={signature}"},
                timeout=10.0,
            )

Webhook alerts allow integration with external bots, trading systems, CRM. Via webhook you can trigger automatic actions — for example, opening an order on a price alert.