Payment Fraud Detection System
Payment fraud — one of most expensive problems for e-commerce: chargebacks, payment system fines, merchant account blocking. Automatic risk assessment system before each transaction reduces fraud without hurting conversion for honest buyers.
Types of Fraud and Attributes
Card Testing — verification of stolen cards with small transactions.
Signs: many attempts from same IP/device, $0.01–$1 amounts, different card numbers, short intervals.
Account Takeover (ATO) — account compromise and theft of saved cards.
Signs: billing address change before purchase, login from new device, immediate large purchase.
Friendly Fraud — buyer orders goods then files chargeback.
Signs: chargeback history, VPN/proxy, delivery to freight forwarder.
Risk Score Model
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class PaymentContext:
user_id: Optional[int]
email: str
ip: str
card_bin: str # first 6 digits
card_last4: str
amount: float
currency: str
billing_country: str
shipping_country: Optional[str]
device_fingerprint: str
user_agent: str
session_age_seconds: int
class FraudScorer:
def __init__(self, redis, db, geoip, maxmind):
self.r = redis
self.db = db
self.geoip = geoip
self.maxmind = maxmind # MaxMind minFraud
def score(self, ctx: PaymentContext) -> dict:
signals = []
total_score = 0
# === Velocity checks ===
v = self._velocity_checks(ctx)
signals.extend(v['signals'])
total_score += v['score']
# === Geolocation checks ===
g = self._geo_checks(ctx)
signals.extend(g['signals'])
total_score += g['score']
# === Card checks ===
c = self._card_checks(ctx)
signals.extend(c['signals'])
total_score += c['score']
# === Account checks ===
if ctx.user_id:
a = self._account_checks(ctx)
signals.extend(a['signals'])
total_score += a['score']
# === Device checks ===
d = self._device_checks(ctx)
signals.extend(d['signals'])
total_score += d['score']
final_score = min(total_score, 100)
return {
'score': final_score,
'signals': signals,
'decision': self._make_decision(final_score, ctx),
'timestamp': time.time()
}
def _velocity_checks(self, ctx: PaymentContext) -> dict:
score = 0
signals = []
# Number of payment attempts from IP in 1 hour
ip_key = f"payment_attempts:ip:{ctx.ip}"
ip_count = self.r.incr(ip_key)
self.r.expire(ip_key, 3600)
if ip_count > 20:
score += 40
signals.append('ip_velocity_critical')
elif ip_count > 10:
score += 20
signals.append('ip_velocity_high')
# Number of unique cards tried from IP in 24 hours
cards_key = f"cards_tried:ip:{ctx.ip}"
self.r.sadd(cards_key, ctx.card_last4)
self.r.expire(cards_key, 86400)
card_count = self.r.scard(cards_key)
if card_count > 3:
score += 35
signals.append(f'multiple_cards_from_ip:{card_count}')
# Failed payment attempts in 1 hour
failures_key = f"payment_failures:ip:{ctx.ip}"
failures = int(self.r.get(failures_key) or 0)
if failures > 5:
score += 30
signals.append(f'payment_failures:{failures}')
return {'score': score, 'signals': signals}







