Protection Against Credential Stuffing Attacks
Credential stuffing — automatic verification of stolen login/password pairs from data breaches against target site. Unlike brute force, this tests specific credentials from other compromised services. Attack conversion: 0.1–2% — with database of 10M pairs this is 10–200K compromised accounts.
Why Standard Rate Limiting Does Not Work
Modern credential stuffing attacks:
- Distributed across thousands of IPs (residential proxies, botnets)
- Mimic browser headers
- Add random delays between requests
- Rotate User-Agent and cookies
Task is to detect anomalies in login behavior without blocking legitimate users.
Multi-Level Protection
class LoginProtectionService:
def __init__(self, redis, db, device_fp_service):
self.r = redis
self.db = db
self.dfp = device_fp_service
def check_login_attempt(self, request, email: str) -> dict:
"""
Check login attempt before DB access.
Returns {'allowed': bool, 'action': str, 'reason': str}
"""
ip = request.remote_addr
checks = [
self._check_ip_reputation(ip),
self._check_ip_velocity(ip),
self._check_email_velocity(email),
self._check_global_failure_rate(),
self._check_device_fingerprint(request),
]
for check in checks:
if not check['allowed']:
return check
return {'allowed': True, 'action': 'proceed'}
def _check_ip_reputation(self, ip: str) -> dict:
"""Check against bad IP lists"""
# AbuseIPDB, Cloudflare Threat Intelligence, MaxMind
if self.r.sismember('blocked_ips', ip):
return {'allowed': False, 'action': 'block', 'reason': 'blocked_ip'}
risk = self.r.get(f'ip_risk:{ip}')
if risk and int(risk) > 80:
return {'allowed': False, 'action': 'challenge', 'reason': 'high_risk_ip'}
return {'allowed': True}
def _check_ip_velocity(self, ip: str) -> dict:
"""Number of attempts from IP in last 10 minutes"""
key = f'login_attempts:ip:{ip}'
count = self.r.incr(key)
self.r.expire(key, 600)
if count > 20:
return {'allowed': False, 'action': 'block', 'reason': f'ip_velocity:{count}'}
if count > 10:
return {'allowed': False, 'action': 'challenge', 'reason': f'ip_velocity:{count}'}
return {'allowed': True}
def _check_email_velocity(self, email: str) -> dict:
"""Number of attempts to specific account"""
import hashlib
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16]
key = f'login_attempts:email:{email_hash}'
count = self.r.incr(key)
self.r.expire(key, 900) # 15 minutes
if count > 5:
# Temporary account lock
self.r.setex(f'account_locked:{email_hash}', 900, '1')
return {'allowed': False, 'action': 'lock', 'reason': f'account_lockout:{count}'}
return {'allowed': True}
def _check_global_failure_rate(self) -> dict:
"""Anomalous rise in login failures across site"""
key = 'global_login_failures'
failures = int(self.r.get(key) or 0)
total = int(self.r.get('global_login_total') or 1)
failure_rate = failures / total
if failure_rate > 0.5 and total > 100:
# More than 50% failures — sign of mass attack
return {'allowed': False, 'action': 'challenge', 'reason': 'global_attack_detected'}
return {'allowed': True}
def _check_device_fingerprint(self, request) -> dict:
"""Device fingerprint from headers"""
fp = self.dfp.compute(request)
key = f'fp_failures:{fp}'
failures = int(self.r.get(key) or 0)
if failures > 3:
return {'allowed': False, 'action': 'block', 'reason': f'fp_failures:{failures}'}
return {'allowed': True}
def record_failure(self, request, email: str):
"""Record failed attempt"""
ip = request.remote_addr
import hashlib
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16]
fp = self.dfp.compute(request)
pipe = self.r.pipeline()
pipe.incr(f'fp_failures:{fp}')
pipe.expire(f'fp_failures:{fp}', 3600)
pipe.incr('global_login_failures')
pipe.expire('global_login_failures', 60) # 1 minute window
pipe.execute()
Check Have I Been Pwned
import hashlib
import httpx
async def is_password_compromised(password: str) -> bool:
"""
k-Anonymity: send only first 5 characters of SHA1 hash.
HIBP doesn't learn original password.
"""
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix = sha1[:5]
suffix = sha1[5:]
async with httpx.AsyncClient() as client:
resp = await client.get(
f'https://api.pwnedpasswords.com/range/{prefix}',
headers={'Add-Padding': 'true'}
)
for line in resp.text.splitlines():
hash_suffix, count = line.split(':')
if hash_suffix == suffix:
return int(count) > 0







