Intelligent rate limiting by IP API key and user for API

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Intelligent Rate Limiting for API

Basic rate limit by IP — X requests per minute indiscriminately. Intelligent rate limiting accounts for user identifier, endpoint type, historical behavior and automatically adapts limits. Correctly configured rate limiting does not interfere with legitimate users but reliably blocks scrapers and DDoS.

Algorithms and Their Application

Token Bucket — classic for APIs. Each user has bucket of tokens replenished at fixed rate. Allows short-term bursts.

Sliding Window — more accurate than Fixed Window. Counts requests over last N seconds relative to current moment, not allowing double limit at window boundary.

Adaptive Rate Limiting — limits change dynamically based on server load or client risk assessment.

Redis Implementation of Sliding Window

import redis
import time
from functools import wraps

r = redis.Redis(host='localhost', decode_responses=True)

def sliding_window_rate_limit(key: str, limit: int, window: int) -> bool:
    """
    key: unique identifier (user_id, ip, api_key)
    limit: max requests over window seconds
    window: window size in seconds
    Returns True if request allowed
    """
    now = time.time()
    window_start = now - window

    pipe = r.pipeline()
    pipe.zremrangebyscore(key, 0, window_start)  # remove old entries
    pipe.zadd(key, {str(now): now})              # add current request
    pipe.zcard(key)                              # count in window
    pipe.expire(key, window)                     # TTL for cleanup
    results = pipe.execute()

    count = results[2]
    return count <= limit

def rate_limit(limit=100, window=60, key_func=None):
    """Decorator for Flask/FastAPI"""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            if key_func:
                key = f"rl:{key_func()}"
            else:
                key = f"rl:{request.remote_addr}"

            if not sliding_window_rate_limit(key, limit, window):
                # Return Retry-After
                return jsonify({'error': 'Too Many Requests'}), 429, {
                    'Retry-After': str(window),
                    'X-RateLimit-Limit': str(limit),
                    'X-RateLimit-Remaining': '0'
                }

            return f(*args, **kwargs)
        return wrapper
    return decorator

Multi-Level Limits by Endpoint

# Different limits for different operations
RATE_LIMITS = {
    'default':          {'limit': 1000, 'window': 3600},   # 1000/hour
    'auth.login':       {'limit': 10,   'window': 900},    # 10 attempts in 15 min
    'auth.register':    {'limit': 5,    'window': 3600},   # 5/hour
    'api.search':       {'limit': 100,  'window': 60},     # 100/min
    'api.export':       {'limit': 10,   'window': 3600},   # 10 exports/hour
    'api.upload':       {'limit': 50,   'window': 3600},   # 50 uploads/hour
    'webhooks.send':    {'limit': 500,  'window': 60},     # 500/min
}

class MultiLevelRateLimiter:
    def check(self, user_id: int, endpoint: str, ip: str) -> dict:
        config = RATE_LIMITS.get(endpoint, RATE_LIMITS['default'])

        # Level 1: by user (authenticated)
        if user_id:
            user_key = f"rl:user:{user_id}:{endpoint}"
            if not sliding_window_rate_limit(user_key, config['limit'], config['window']):
                return {'allowed': False, 'reason': 'user_limit'}

        # Level 2: by IP (protect from account creation abuse)
        ip_key = f"rl:ip:{ip}:{endpoint}"
        ip_limit = config['limit'] * 3  # IP limit higher than user limit
        if not sliding_window_rate_limit(ip_key, ip_limit, config['window']):
            return {'allowed': False, 'reason': 'ip_limit'}

        # Level 3: global (protect from DDoS)
        global_key = f"rl:global:{endpoint}"
        global_limit = config['limit'] * 100
        if not sliding_window_rate_limit(global_key, global_limit, config['window']):
            return {'allowed': False, 'reason': 'global_limit'}

        return {'allowed': True}

Adaptive Rate Limit by Risk

class AdaptiveRateLimiter:
    def get_risk_score(self, request) -> float:
        """Estimate request risk from 0.0 (low) to 1.0 (high)"""
        score = 0.0

        # Suspicious User-Agent
        ua = request.headers.get('User-Agent', '')
        if not ua or 'python-requests' in ua.lower() or 'curl' in ua.lower():
            score += 0.3

        # No browser headers
        if not request.headers.get('Accept-Language'):
            score += 0.2

        # Recent error history (many 404, 401)
        error_count = r.get(f"errors:{request.remote_addr}") or 0
        if int(error_count) > 10:
            score += 0.3

        # Requests from Tor/VPN IP (check against list)
        if self.is_known_proxy(request.remote_addr):
            score += 0.2

        return min(score, 1.0)

    def get_effective_limit(self, base_limit: int, risk_score: float) -> int:
        """Lower limit for suspicious clients"""
        multiplier = 1.0 - (risk_score * 0.8)  # up to 80% reduction
        return max(int(base_limit * multiplier), 1)

Response Headers

RFC 6585 and API standards require informative headers:

def add_rate_limit_headers(response, key, limit, window):
    now = time.time()
    current_count = r.zcard(key) or 0
    remaining = max(0, limit - current_count)

    # Reset time = start of next window
    reset_at = int(now) + window - (int(now) % window)