AI DeFi Protocol Monitoring System (Risks, Exploit Detection)

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI DeFi Protocol Monitoring System (Risks, Exploit Detection)
Complex
~2-4 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

AI-based DeFi protocol monitoring system (risks, exploit detection)

DeFi protocols store billions of dollars in smart contracts. Between 2020 and 2024, $5 billion was lost through exploits. AI monitoring analyzes on-chain transactions in real time and detects attacks—flash loan manipulation, reentrancy, price oracle manipulation—within a few blocks.

On-chain monitoring architecture

Data Streams for DeFi Monitoring:

data_streams = {
    'mempool_monitoring': {
        'description': 'pending транзакции до включения в блок',
        'latency': 'real-time (< 1 секунды)',
        'use': 'обнаружение MEV-ботов, sandwich attacks в полёте'
    },
    'block_stream': {
        'description': 'подтверждённые блоки (Ethereum ~12 сек, BSC ~3 сек)',
        'providers': ['Infura', 'Alchemy', 'QuickNode', 'собственная нода'],
        'use': 'анализ завершённых транзакций'
    },
    'event_logs': {
        'description': 'события смарт-контрактов (Transfer, Swap, Liquidation)',
        'parsing': 'ABI decoding → структурированные события'
    },
    'price_feeds': {
        'sources': ['Chainlink oracle', 'TWAP Uniswap v3', 'Pyth Network'],
        'use': 'базовая цена для определения аномальных отклонений'
    }
}

Detection of Flash Loan attacks

Flash loan exploit pattern:

from web3 import Web3
import pandas as pd
import numpy as np

def detect_flash_loan_attack(block_transactions: pd.DataFrame,
                              protocol_address: str,
                              price_oracle_data: dict) -> dict:
    """
    Flash loan атака: берём кредит → манипулируем ценой → эксплойтируем → возвращаем.
    Признаки: один аккаунт, одна транзакция, огромный объём займа,
    аномальное изменение цены, извлечение средств.
    """
    protocol_txs = block_transactions[
        block_transactions['to_address'].str.lower() == protocol_address.lower()
    ]

    if protocol_txs.empty:
        return {'attack_detected': False}

    suspicious_txs = []
    for _, tx in protocol_txs.iterrows():
        # 1. Flash loan признак: borrow + repay в одной транзакции
        events = tx.get('decoded_events', [])
        event_types = [e.get('event', '') for e in events]

        has_flash_loan = 'FlashLoan' in event_types or (
            'Borrow' in event_types and 'Repay' in event_types
        )

        if not has_flash_loan:
            continue

        # 2. Объём займа
        borrow_events = [e for e in events if e.get('event') == 'Borrow']
        if borrow_events:
            loan_amount_usd = borrow_events[0].get('amount_usd', 0)
        else:
            continue

        # 3. Изменение цены оракула в том же блоке
        block_number = tx['block_number']
        price_before = price_oracle_data.get(f'block_{block_number-1}', {}).get('price', None)
        price_after = price_oracle_data.get(f'block_{block_number}', {}).get('price', None)

        if price_before and price_after:
            price_deviation = abs(price_after - price_before) / price_before
        else:
            price_deviation = 0

        # 4. Извлечение средств из протокола
        protocol_balance_change = tx.get('protocol_balance_change_usd', 0)

        # Флаги атаки
        is_suspicious = (
            loan_amount_usd > 1_000_000 and   # > $1M займ
            price_deviation > 0.05 and          # > 5% отклонение цены
            protocol_balance_change < -100_000  # вывод > $100K
        )

        if is_suspicious:
            suspicious_txs.append({
                'tx_hash': tx['hash'],
                'loan_amount_usd': loan_amount_usd,
                'price_deviation_pct': round(price_deviation * 100, 2),
                'protocol_loss_usd': abs(protocol_balance_change),
                'attack_type': 'flash_loan_price_manipulation'
            })

    return {
        'attack_detected': len(suspicious_txs) > 0,
        'suspicious_transactions': suspicious_txs,
        'total_potential_loss_usd': sum(t['protocol_loss_usd'] for t in suspicious_txs)
    }

Pool Liquidity Monitoring (AMM)

Anomalous Swap Detection:

def monitor_amm_pool_anomaly(pool_address: str,
                               swap_events: pd.DataFrame,
                               baseline_stats: dict) -> dict:
    """
    Аномальный своп: огромный размер относительно пула → резкое проскальзывание.
    Price impact > 10% = манипуляция или ошибка.
    """
    recent_swaps = swap_events[
        swap_events['pool_address'].str.lower() == pool_address.lower()
    ].tail(100)

    if recent_swaps.empty:
        return {'status': 'no_activity'}

    # Расчёт price impact
    recent_swaps['price_impact_pct'] = abs(
        (recent_swaps['price_after'] - recent_swaps['price_before']) /
        recent_swaps['price_before'] * 100
    )

    # Статистическая аномалия
    avg_price_impact = baseline_stats.get('avg_price_impact', 0.5)
    std_price_impact = baseline_stats.get('std_price_impact', 0.3)

    anomalies = recent_swaps[recent_swaps['price_impact_pct'] > avg_price_impact + 5 * std_price_impact]

    # Volume spike
    recent_volume = recent_swaps['amount_usd'].sum()
    baseline_volume = baseline_stats.get('hourly_volume_usd', recent_volume)
    volume_ratio = recent_volume / (baseline_volume + 1e-9)

    sandwich_attack = detect_sandwich_pattern(recent_swaps)

    return {
        'pool': pool_address,
        'anomalous_swaps': len(anomalies),
        'max_price_impact_pct': round(recent_swaps['price_impact_pct'].max(), 2),
        'volume_spike_ratio': round(volume_ratio, 2),
        'sandwich_attack_detected': sandwich_attack,
        'risk_level': 'critical' if len(anomalies) > 0 and volume_ratio > 5 else
                      'warning' if len(anomalies) > 0 else 'normal'
    }

def detect_sandwich_pattern(swaps: pd.DataFrame) -> bool:
    """
    Sandwich: bot-buy → victim-buy → bot-sell в последовательных транзакциях.
    """
    if len(swaps) < 3:
        return False

    for i in range(len(swaps) - 2):
        bot_buy = swaps.iloc[i]
        victim = swaps.iloc[i+1]
        bot_sell = swaps.iloc[i+2]

        # Одинаковый sender для buy и sell (бот)
        if bot_buy['sender'] == bot_sell['sender'] and bot_buy['sender'] != victim['sender']:
            # Противоположные направления
            if (bot_buy['direction'] != bot_sell['direction'] and
                    bot_buy['direction'] == victim['direction']):
                return True
    return False

Monitoring liquidations (Lending Protocols)

Abnormal liquidations and oracle manipulation:

def monitor_liquidation_risk(lending_protocol: str,
                               positions: pd.DataFrame,
                               current_prices: dict) -> dict:
    """
    Aave, Compound, MakerDAO: позиции с health factor < 1.5 = риск ликвидации.
    Резкое падение oracle цены = массовые ликвидации = потенциальный oracle attack.
    """
    at_risk_positions = []

    for _, position in positions.iterrows():
        collateral_asset = position['collateral_asset']
        collateral_amount = position['collateral_amount']
        debt_amount = position['debt_usd']

        current_price = current_prices.get(collateral_asset, position['entry_price'])
        collateral_usd = collateral_amount * current_price
        health_factor = collateral_usd * position['ltv_ratio'] / (debt_amount + 1e-9)

        if health_factor < 1.2:
            at_risk_positions.append({
                'position_id': position['id'],
                'health_factor': round(health_factor, 3),
                'collateral_usd': round(collateral_usd, 2),
                'debt_usd': debt_amount,
                'liquidatable': health_factor < 1.0
            })

    # Аномальное число ликвидаций = сигнал oracle manipulation
    liquidatable = sum(1 for p in at_risk_positions if p['liquidatable'])
    total_at_risk_usd = sum(p['collateral_usd'] for p in at_risk_positions)

    return {
        'protocol': lending_protocol,
        'at_risk_positions': len(at_risk_positions),
        'liquidatable_now': liquidatable,
        'total_at_risk_usd': total_at_risk_usd,
        'oracle_attack_risk': liquidatable > 100,  # массовые ликвидации = подозрение
        'action': 'pause_oracle_feed' if liquidatable > 200 else None
    }

Emergency Response

Automatic reaction when attacking:

def emergency_response_protocol(attack_detected: dict, protocol_guardian: object) -> dict:
    """
    Circuit breaker: автоматическая пауза протокола при критической атаке.
    Требует multisig от DAO или Guardian роли.
    """
    if not attack_detected.get('attack_detected', False):
        return {'action': 'none'}

    loss_usd = attack_detected.get('total_potential_loss_usd', 0)

    response_actions = []

    if loss_usd > 1_000_000:
        response_actions.append({
            'action': 'pause_protocol',
            'method': 'guardian.pauseProtocol()',
            'requires': 'guardian_multisig',
            'urgency': 'immediate'
        })
        response_actions.append({
            'action': 'notify_security_team',
            'channels': ['telegram', 'pagerduty', 'discord'],
            'message': f'CRITICAL: Possible exploit detected. Est. loss: ${loss_usd:,.0f}'
        })

    if loss_usd > 100_000:
        response_actions.append({
            'action': 'block_suspicious_address',
            'tx_hash': attack_detected['suspicious_transactions'][0].get('tx_hash'),
            'note': 'Front-run via OFAC / blacklist'
        })

    return {'response_actions': response_actions, 'estimated_loss_usd': loss_usd}

Integration with Security Ecosystem: Forta Network (decentralized detection), Tenderly Alerts, OpenZeppelin Defender. Notifications via Telegram/Discord DAO security channel. Blacklist integration with Circle (USDC) and Tether (USDT) to freeze attacker funds.

Timeframe: On-chain monitoring + flash loan detector + price impact anomalies + alerts — 5-6 weeks. Oracle manipulation, sandwich detection, lending liquidation monitor, emergency circuit breaker, DAO governance integration — 3-4 months.