AI-based DeFi protocol monitoring system (risks, exploit detection)
DeFi protocols store billions of dollars in smart contracts. Between 2020 and 2024, $5 billion was lost through exploits. AI monitoring analyzes on-chain transactions in real time and detects attacks—flash loan manipulation, reentrancy, price oracle manipulation—within a few blocks.
On-chain monitoring architecture
Data Streams for DeFi Monitoring:
data_streams = {
'mempool_monitoring': {
'description': 'pending транзакции до включения в блок',
'latency': 'real-time (< 1 секунды)',
'use': 'обнаружение MEV-ботов, sandwich attacks в полёте'
},
'block_stream': {
'description': 'подтверждённые блоки (Ethereum ~12 сек, BSC ~3 сек)',
'providers': ['Infura', 'Alchemy', 'QuickNode', 'собственная нода'],
'use': 'анализ завершённых транзакций'
},
'event_logs': {
'description': 'события смарт-контрактов (Transfer, Swap, Liquidation)',
'parsing': 'ABI decoding → структурированные события'
},
'price_feeds': {
'sources': ['Chainlink oracle', 'TWAP Uniswap v3', 'Pyth Network'],
'use': 'базовая цена для определения аномальных отклонений'
}
}
Detection of Flash Loan attacks
Flash loan exploit pattern:
from web3 import Web3
import pandas as pd
import numpy as np
def detect_flash_loan_attack(block_transactions: pd.DataFrame,
protocol_address: str,
price_oracle_data: dict) -> dict:
"""
Flash loan атака: берём кредит → манипулируем ценой → эксплойтируем → возвращаем.
Признаки: один аккаунт, одна транзакция, огромный объём займа,
аномальное изменение цены, извлечение средств.
"""
protocol_txs = block_transactions[
block_transactions['to_address'].str.lower() == protocol_address.lower()
]
if protocol_txs.empty:
return {'attack_detected': False}
suspicious_txs = []
for _, tx in protocol_txs.iterrows():
# 1. Flash loan признак: borrow + repay в одной транзакции
events = tx.get('decoded_events', [])
event_types = [e.get('event', '') for e in events]
has_flash_loan = 'FlashLoan' in event_types or (
'Borrow' in event_types and 'Repay' in event_types
)
if not has_flash_loan:
continue
# 2. Объём займа
borrow_events = [e for e in events if e.get('event') == 'Borrow']
if borrow_events:
loan_amount_usd = borrow_events[0].get('amount_usd', 0)
else:
continue
# 3. Изменение цены оракула в том же блоке
block_number = tx['block_number']
price_before = price_oracle_data.get(f'block_{block_number-1}', {}).get('price', None)
price_after = price_oracle_data.get(f'block_{block_number}', {}).get('price', None)
if price_before and price_after:
price_deviation = abs(price_after - price_before) / price_before
else:
price_deviation = 0
# 4. Извлечение средств из протокола
protocol_balance_change = tx.get('protocol_balance_change_usd', 0)
# Флаги атаки
is_suspicious = (
loan_amount_usd > 1_000_000 and # > $1M займ
price_deviation > 0.05 and # > 5% отклонение цены
protocol_balance_change < -100_000 # вывод > $100K
)
if is_suspicious:
suspicious_txs.append({
'tx_hash': tx['hash'],
'loan_amount_usd': loan_amount_usd,
'price_deviation_pct': round(price_deviation * 100, 2),
'protocol_loss_usd': abs(protocol_balance_change),
'attack_type': 'flash_loan_price_manipulation'
})
return {
'attack_detected': len(suspicious_txs) > 0,
'suspicious_transactions': suspicious_txs,
'total_potential_loss_usd': sum(t['protocol_loss_usd'] for t in suspicious_txs)
}
Pool Liquidity Monitoring (AMM)
Anomalous Swap Detection:
def monitor_amm_pool_anomaly(pool_address: str,
swap_events: pd.DataFrame,
baseline_stats: dict) -> dict:
"""
Аномальный своп: огромный размер относительно пула → резкое проскальзывание.
Price impact > 10% = манипуляция или ошибка.
"""
recent_swaps = swap_events[
swap_events['pool_address'].str.lower() == pool_address.lower()
].tail(100)
if recent_swaps.empty:
return {'status': 'no_activity'}
# Расчёт price impact
recent_swaps['price_impact_pct'] = abs(
(recent_swaps['price_after'] - recent_swaps['price_before']) /
recent_swaps['price_before'] * 100
)
# Статистическая аномалия
avg_price_impact = baseline_stats.get('avg_price_impact', 0.5)
std_price_impact = baseline_stats.get('std_price_impact', 0.3)
anomalies = recent_swaps[recent_swaps['price_impact_pct'] > avg_price_impact + 5 * std_price_impact]
# Volume spike
recent_volume = recent_swaps['amount_usd'].sum()
baseline_volume = baseline_stats.get('hourly_volume_usd', recent_volume)
volume_ratio = recent_volume / (baseline_volume + 1e-9)
sandwich_attack = detect_sandwich_pattern(recent_swaps)
return {
'pool': pool_address,
'anomalous_swaps': len(anomalies),
'max_price_impact_pct': round(recent_swaps['price_impact_pct'].max(), 2),
'volume_spike_ratio': round(volume_ratio, 2),
'sandwich_attack_detected': sandwich_attack,
'risk_level': 'critical' if len(anomalies) > 0 and volume_ratio > 5 else
'warning' if len(anomalies) > 0 else 'normal'
}
def detect_sandwich_pattern(swaps: pd.DataFrame) -> bool:
"""
Sandwich: bot-buy → victim-buy → bot-sell в последовательных транзакциях.
"""
if len(swaps) < 3:
return False
for i in range(len(swaps) - 2):
bot_buy = swaps.iloc[i]
victim = swaps.iloc[i+1]
bot_sell = swaps.iloc[i+2]
# Одинаковый sender для buy и sell (бот)
if bot_buy['sender'] == bot_sell['sender'] and bot_buy['sender'] != victim['sender']:
# Противоположные направления
if (bot_buy['direction'] != bot_sell['direction'] and
bot_buy['direction'] == victim['direction']):
return True
return False
Monitoring liquidations (Lending Protocols)
Abnormal liquidations and oracle manipulation:
def monitor_liquidation_risk(lending_protocol: str,
positions: pd.DataFrame,
current_prices: dict) -> dict:
"""
Aave, Compound, MakerDAO: позиции с health factor < 1.5 = риск ликвидации.
Резкое падение oracle цены = массовые ликвидации = потенциальный oracle attack.
"""
at_risk_positions = []
for _, position in positions.iterrows():
collateral_asset = position['collateral_asset']
collateral_amount = position['collateral_amount']
debt_amount = position['debt_usd']
current_price = current_prices.get(collateral_asset, position['entry_price'])
collateral_usd = collateral_amount * current_price
health_factor = collateral_usd * position['ltv_ratio'] / (debt_amount + 1e-9)
if health_factor < 1.2:
at_risk_positions.append({
'position_id': position['id'],
'health_factor': round(health_factor, 3),
'collateral_usd': round(collateral_usd, 2),
'debt_usd': debt_amount,
'liquidatable': health_factor < 1.0
})
# Аномальное число ликвидаций = сигнал oracle manipulation
liquidatable = sum(1 for p in at_risk_positions if p['liquidatable'])
total_at_risk_usd = sum(p['collateral_usd'] for p in at_risk_positions)
return {
'protocol': lending_protocol,
'at_risk_positions': len(at_risk_positions),
'liquidatable_now': liquidatable,
'total_at_risk_usd': total_at_risk_usd,
'oracle_attack_risk': liquidatable > 100, # массовые ликвидации = подозрение
'action': 'pause_oracle_feed' if liquidatable > 200 else None
}
Emergency Response
Automatic reaction when attacking:
def emergency_response_protocol(attack_detected: dict, protocol_guardian: object) -> dict:
"""
Circuit breaker: автоматическая пауза протокола при критической атаке.
Требует multisig от DAO или Guardian роли.
"""
if not attack_detected.get('attack_detected', False):
return {'action': 'none'}
loss_usd = attack_detected.get('total_potential_loss_usd', 0)
response_actions = []
if loss_usd > 1_000_000:
response_actions.append({
'action': 'pause_protocol',
'method': 'guardian.pauseProtocol()',
'requires': 'guardian_multisig',
'urgency': 'immediate'
})
response_actions.append({
'action': 'notify_security_team',
'channels': ['telegram', 'pagerduty', 'discord'],
'message': f'CRITICAL: Possible exploit detected. Est. loss: ${loss_usd:,.0f}'
})
if loss_usd > 100_000:
response_actions.append({
'action': 'block_suspicious_address',
'tx_hash': attack_detected['suspicious_transactions'][0].get('tx_hash'),
'note': 'Front-run via OFAC / blacklist'
})
return {'response_actions': response_actions, 'estimated_loss_usd': loss_usd}
Integration with Security Ecosystem: Forta Network (decentralized detection), Tenderly Alerts, OpenZeppelin Defender. Notifications via Telegram/Discord DAO security channel. Blacklist integration with Circle (USDC) and Tether (USDT) to freeze attacker funds.
Timeframe: On-chain monitoring + flash loan detector + price impact anomalies + alerts — 5-6 weeks. Oracle manipulation, sandwich detection, lending liquidation monitor, emergency circuit breaker, DAO governance integration — 3-4 months.







