Setting Up Webhook System with Message Signature (HMAC)
HMAC (Hash-based Message Authentication Code) signature in webhook guarantees that message truly came from expected sender and was not altered in transit. Without signature verification, any attacker can send fake webhook to your endpoint.
HMAC Principle
- Sender and receiver agree on secret key
- On send: sender computes
HMAC-SHA256(payload, secret)and adds to header - On receive: receiver computes same value and compares with header
- If matches — message is authentic
Generating Signature When Sending Webhook
import hmac
import hashlib
import json
import requests
def send_webhook(url: str, payload: dict, secret: str):
body = json.dumps(payload, separators=(',', ':'))
timestamp = int(time.time())
# Signature includes timestamp to protect from replay attacks
message = f"{timestamp}.{body}"
signature = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
response = requests.post(
url,
data=body,
headers={
'Content-Type': 'application/json',
'X-Webhook-Timestamp': str(timestamp),
'X-Webhook-Signature': f"sha256={signature}",
'X-Webhook-ID': str(uuid.uuid4()),
},
timeout=10
)
return response
Verifying Signature on Receiver Side
import hmac
import hashlib
import time
def verify_webhook_signature(request) -> bool:
secret = os.environ['WEBHOOK_SECRET']
# Extract from headers
timestamp = request.headers.get('X-Webhook-Timestamp')
received_sig = request.headers.get('X-Webhook-Signature', '')
if not timestamp or not received_sig:
return False
# Protect from replay attack: don't accept events older than 5 minutes
if abs(time.time() - int(timestamp)) > 300:
return False
# Compute expected signature
body = request.get_data() # raw bytes, before parsing!
message = f"{timestamp}.{body.decode()}".encode()
expected_sig = "sha256=" + hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
# Constant-time comparison to protect from timing attack
return hmac.compare_digest(expected_sig, received_sig)
@app.route('/webhooks/payments', methods=['POST'])
def payment_webhook():
if not verify_webhook_signature(request):
return jsonify({'error': 'Invalid signature'}), 401
# Safely process payload
event = request.get_json()
process_payment_event(event)
return jsonify({'status': 'ok'})
Stripe-Compatible Format
Stripe uses t=timestamp,v1=signature in Stripe-Signature header:
def verify_stripe_webhook(payload, sig_header, secret):
# Stripe format: "t=1614556800,v1=abcdef..."
elements = dict(e.split('=') for e in sig_header.split(','))
timestamp = elements.get('t')
sig = elements.get('v1')
signed_payload = f"{timestamp}.{payload}"
expected = hmac.new(secret.encode(), signed_payload.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig)
Retry Logic and Idempotency
class WebhookDelivery:
MAX_ATTEMPTS = 5
RETRY_DELAYS = [10, 30, 120, 600, 3600] # seconds between attempts
def deliver_with_retry(self, webhook_id: str, url: str, payload: dict, secret: str):
for attempt, delay in enumerate(self.RETRY_DELAYS):
try:
response = send_webhook(url, payload, secret)
if response.status_code < 300:
db.mark_delivered(webhook_id)
return True
db.log_attempt(webhook_id, attempt + 1, response.status_code)
except requests.exceptions.Timeout:
db.log_attempt(webhook_id, attempt + 1, error='timeout')
if attempt < len(self.RETRY_DELAYS) - 1:
time.sleep(delay)
db.mark_failed(webhook_id)
return False
Idempotency on Receiver Side
def handle_webhook_idempotent(webhook_id: str, handler_fn):
"""Prevent double processing on retry"""
if db.is_processed(webhook_id):
return # Already processed
with db.transaction():
db.mark_processing(webhook_id)
handler_fn()
db.mark_processed(webhook_id)







