Intelligent Rate Limiting for API
Basic rate limit by IP — X requests per minute indiscriminately. Intelligent rate limiting accounts for user identifier, endpoint type, historical behavior and automatically adapts limits. Correctly configured rate limiting does not interfere with legitimate users but reliably blocks scrapers and DDoS.
Algorithms and Their Application
Token Bucket — classic for APIs. Each user has bucket of tokens replenished at fixed rate. Allows short-term bursts.
Sliding Window — more accurate than Fixed Window. Counts requests over last N seconds relative to current moment, not allowing double limit at window boundary.
Adaptive Rate Limiting — limits change dynamically based on server load or client risk assessment.
Redis Implementation of Sliding Window
import redis
import time
from functools import wraps
r = redis.Redis(host='localhost', decode_responses=True)
def sliding_window_rate_limit(key: str, limit: int, window: int) -> bool:
"""
key: unique identifier (user_id, ip, api_key)
limit: max requests over window seconds
window: window size in seconds
Returns True if request allowed
"""
now = time.time()
window_start = now - window
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, window_start) # remove old entries
pipe.zadd(key, {str(now): now}) # add current request
pipe.zcard(key) # count in window
pipe.expire(key, window) # TTL for cleanup
results = pipe.execute()
count = results[2]
return count <= limit
def rate_limit(limit=100, window=60, key_func=None):
"""Decorator for Flask/FastAPI"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if key_func:
key = f"rl:{key_func()}"
else:
key = f"rl:{request.remote_addr}"
if not sliding_window_rate_limit(key, limit, window):
# Return Retry-After
return jsonify({'error': 'Too Many Requests'}), 429, {
'Retry-After': str(window),
'X-RateLimit-Limit': str(limit),
'X-RateLimit-Remaining': '0'
}
return f(*args, **kwargs)
return wrapper
return decorator
Multi-Level Limits by Endpoint
# Different limits for different operations
RATE_LIMITS = {
'default': {'limit': 1000, 'window': 3600}, # 1000/hour
'auth.login': {'limit': 10, 'window': 900}, # 10 attempts in 15 min
'auth.register': {'limit': 5, 'window': 3600}, # 5/hour
'api.search': {'limit': 100, 'window': 60}, # 100/min
'api.export': {'limit': 10, 'window': 3600}, # 10 exports/hour
'api.upload': {'limit': 50, 'window': 3600}, # 50 uploads/hour
'webhooks.send': {'limit': 500, 'window': 60}, # 500/min
}
class MultiLevelRateLimiter:
def check(self, user_id: int, endpoint: str, ip: str) -> dict:
config = RATE_LIMITS.get(endpoint, RATE_LIMITS['default'])
# Level 1: by user (authenticated)
if user_id:
user_key = f"rl:user:{user_id}:{endpoint}"
if not sliding_window_rate_limit(user_key, config['limit'], config['window']):
return {'allowed': False, 'reason': 'user_limit'}
# Level 2: by IP (protect from account creation abuse)
ip_key = f"rl:ip:{ip}:{endpoint}"
ip_limit = config['limit'] * 3 # IP limit higher than user limit
if not sliding_window_rate_limit(ip_key, ip_limit, config['window']):
return {'allowed': False, 'reason': 'ip_limit'}
# Level 3: global (protect from DDoS)
global_key = f"rl:global:{endpoint}"
global_limit = config['limit'] * 100
if not sliding_window_rate_limit(global_key, global_limit, config['window']):
return {'allowed': False, 'reason': 'global_limit'}
return {'allowed': True}
Adaptive Rate Limit by Risk
class AdaptiveRateLimiter:
def get_risk_score(self, request) -> float:
"""Estimate request risk from 0.0 (low) to 1.0 (high)"""
score = 0.0
# Suspicious User-Agent
ua = request.headers.get('User-Agent', '')
if not ua or 'python-requests' in ua.lower() or 'curl' in ua.lower():
score += 0.3
# No browser headers
if not request.headers.get('Accept-Language'):
score += 0.2
# Recent error history (many 404, 401)
error_count = r.get(f"errors:{request.remote_addr}") or 0
if int(error_count) > 10:
score += 0.3
# Requests from Tor/VPN IP (check against list)
if self.is_known_proxy(request.remote_addr):
score += 0.2
return min(score, 1.0)
def get_effective_limit(self, base_limit: int, risk_score: float) -> int:
"""Lower limit for suspicious clients"""
multiplier = 1.0 - (risk_score * 0.8) # up to 80% reduction
return max(int(base_limit * multiplier), 1)
Response Headers
RFC 6585 and API standards require informative headers:
def add_rate_limit_headers(response, key, limit, window):
now = time.time()
current_count = r.zcard(key) or 0
remaining = max(0, limit - current_count)
# Reset time = start of next window
reset_at = int(now) + window - (int(now) % window)







