Real-Time Traffic Anomaly Detection
Traffic anomaly — deviation from historically normal pattern: sudden request spike, unexpected error growth, atypical endpoint distribution. Automatic detection allows responding to DDoS, scraping attempts and infrastructure failures in seconds, not hours.
What Counts as Anomaly
Volume anomalies: RPS, bandwidth, unique IP count surge sharply.
Structural anomalies: HTTP method ratio changes (sudden GET spike when norm is 70/30 GET/POST), specific endpoint request share grows.
Quality anomalies: 4xx/5xx error share grows, p99 latency breaks historical norm, 404 share increases (scanning).
Statistical Detection Methods
import numpy as np
from collections import deque
import time
class TrafficAnomalyDetector:
def __init__(self, window_size=60, sensitivity=3.0):
"""
window_size: sliding window size in points (seconds/minutes)
sensitivity: threshold in sigmas (z-score)
"""
self.window_size = window_size
self.sensitivity = sensitivity
self.metrics = {} # {metric_name: deque of values}
def _get_window(self, metric: str) -> deque:
if metric not in self.metrics:
self.metrics[metric] = deque(maxlen=self.window_size)
return self.metrics[metric]
def add_point(self, metric: str, value: float):
"""Add new data point"""
self.metrics.setdefault(metric, deque(maxlen=self.window_size)).append(value)
def check(self, metric: str, current_value: float) -> dict:
"""Check if current value is anomaly"""
window = self._get_window(metric)
if len(window) < 10:
# Insufficient data for analysis
return {'anomaly': False, 'reason': 'insufficient_data'}
values = list(window)
mean = np.mean(values)
std = np.std(values)
if std == 0:
z_score = 0 if current_value == mean else float('inf')
else:
z_score = abs(current_value - mean) / std
is_anomaly = z_score > self.sensitivity
direction = 'spike' if current_value > mean else 'drop'
return {
'anomaly': is_anomaly,
'z_score': round(z_score, 2),
'direction': direction if is_anomaly else None,
'current': current_value,
'baseline_mean': round(mean, 2),
'baseline_std': round(std, 2),
'threshold': round(mean + self.sensitivity * std, 2)
}
Exponential Smoothing (EWMA)
Better responds to trends, not sensitive to single outliers:
class EWMADetector:
def __init__(self, alpha=0.1, k=3.0):
"""
alpha: smoothing coefficient (0.05–0.2)
k: number of standard deviations for threshold
"""
self.alpha = alpha
self.k = k
self.ewma = {} # {metric: {'mean': float, 'variance': float}}
def update_and_check(self, metric: str, value: float) -> dict:
if metric not in self.ewma:
self.ewma[metric] = {'mean': value, 'variance': 0}
return {'anomaly': False}
state = self.ewma[metric]
mean = state['mean']
variance = state['variance']
# Update EWMA mean and variance
new_mean = self.alpha * value + (1 - self.alpha) * mean
new_variance = (1 - self.alpha) * (variance + self.alpha * (value - mean) ** 2)
state['mean'] = new_mean
state['variance'] = new_variance
std = np.sqrt(new_variance) if new_variance > 0 else 0
threshold_high = new_mean + self.k * std
threshold_low = max(0, new_mean - self.k * std)
is_anomaly = value > threshold_high or value < threshold_low
return {
'anomaly': is_anomaly,
'direction': 'spike' if value > threshold_high else 'drop',
'current': value,
'expected': round(new_mean, 2),
'threshold_high': round(threshold_high, 2),
'deviation_pct': round(abs(value - new_mean) / max(new_mean, 1) * 100, 1)
}







