Database Sharding Setup for Web Applications
Sharding is needed when a single PostgreSQL server can't handle data volume or write load. This isn't the first optimization step — before it come indexes, partitioning, replication, and caching. But when tables grow to hundreds of millions of rows and concurrent writes number thousands per second — sharding becomes necessary.
Partitioning vs Sharding
Partitioning — splitting one table into physical parts within a single PostgreSQL instance. Sharding — distributing data across multiple independent servers.
Partitioning is simpler and often sufficient. Start with it:
-- Range partitioning by date (logs, events)
CREATE TABLE events (
id BIGSERIAL,
user_id BIGINT NOT NULL,
event_type VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB
) PARTITION BY RANGE (created_at);
CREATE TABLE events_2024_q1 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE events_2024_q2 PARTITION OF events
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
-- Hash partitioning for even distribution
CREATE TABLE user_sessions (
id BIGSERIAL,
user_id BIGINT NOT NULL,
token VARCHAR(255) NOT NULL,
data JSONB
) PARTITION BY HASH (user_id);
CREATE TABLE user_sessions_0 PARTITION OF user_sessions
FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE user_sessions_1 PARTITION OF user_sessions
FOR VALUES WITH (MODULUS 4, REMAINDER 1);
-- etc. to REMAINDER 3
Sharding with Citus
Citus is a PostgreSQL extension that turns it into a distributed database. This is the least painful path to sharding for PostgreSQL projects.
# Docker Compose for local testing
docker run -e POSTGRES_PASSWORD=pass -p 5432:5432 citusdata/citus:12.1
-- Add workers
SELECT citus_add_node('worker1', 5432);
SELECT citus_add_node('worker2', 5432);
-- Create distributed table
CREATE TABLE orders (
id BIGSERIAL,
tenant_id INT NOT NULL,
user_id BIGINT NOT NULL,
status VARCHAR(20) NOT NULL,
total DECIMAL(12,2),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, tenant_id) -- partition key must be in PK
);
SELECT create_distributed_table('orders', 'tenant_id', shard_count => 32);
-- Table for colocation (JOIN by tenant_id will be local)
CREATE TABLE order_items (
id BIGSERIAL,
tenant_id INT NOT NULL,
order_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INT NOT NULL,
PRIMARY KEY (id, tenant_id)
);
SELECT create_distributed_table('order_items', 'tenant_id',
colocate_with => 'orders');
-- Reference table: replicated to all workers
CREATE TABLE categories (id BIGSERIAL PRIMARY KEY, name VARCHAR(200));
SELECT create_reference_table('categories');
After this, queries filtered by tenant_id route to specific shard. JOIN between orders and order_items by tenant_id executes locally on worker.
Application-Level Sharding
When Citus unavailable or full control needed — implement sharding in application.
Shard key choice — main architectural decision. Good shard keys:
-
user_id— for user-centric apps -
tenant_id— for multi-tenant SaaS -
region— for geographically distributed data
Bad shard keys:
-
created_at— hot spot on latest shard -
status— uneven distribution - UUID v4 — no locality, poor cache hit
Consistent hashing:
# sharding/router.py
import hashlib
from dataclasses import dataclass
from typing import Any
@dataclass
class ShardConfig:
host: str
port: int
database: str
SHARDS: dict[int, ShardConfig] = {
0: ShardConfig('db-shard-0', 5432, 'myapp_0'),
1: ShardConfig('db-shard-1', 5432, 'myapp_1'),
2: ShardConfig('db-shard-2', 5432, 'myapp_2'),
3: ShardConfig('db-shard-3', 5432, 'myapp_3'),
}
SHARD_COUNT = len(SHARDS)
def get_shard_id(shard_key: Any) -> int:
"""Deterministic shard determination by key."""
key_bytes = str(shard_key).encode('utf-8')
hash_value = int(hashlib.md5(key_bytes).hexdigest(), 16)
return hash_value % SHARD_COUNT
def get_shard_config(shard_key: Any) -> ShardConfig:
return SHARDS[get_shard_id(shard_key)]
Connections to shards:
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from functools import lru_cache
@lru_cache(maxsize=None)
def _get_engine(shard_id: int):
cfg = SHARDS[shard_id]
dsn = f"postgresql+psycopg2://user:pass@{cfg.host}:{cfg.port}/{cfg.database}"
return create_engine(dsn, pool_size=5, max_overflow=10)
@contextmanager
def get_shard_session(shard_key):
shard_id = get_shard_id(shard_key)
Session = sessionmaker(bind=_get_engine(shard_id))
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Usage:
with get_shard_session(user_id=12345) as session:
orders = session.query(Order).filter_by(user_id=12345).all()
Cross-Shard Queries
Queries across multiple shards — most complex part. Two approaches:
Scatter-gather — parallel query to all shards, merge at application level:
import asyncio
import asyncpg
async def get_all_orders_by_status(status: str) -> list[dict]:
"""Scatter-gather across all shards."""
async def query_shard(shard_id: int) -> list[dict]:
cfg = SHARDS[shard_id]
conn = await asyncpg.connect(
host=cfg.host, database=cfg.database,
user='app', password='pass'
)
rows = await conn.fetch(
"SELECT * FROM orders WHERE status = $1 ORDER BY created_at DESC LIMIT 100",
status
)
await conn.close()
return [dict(r) for r in rows]
results = await asyncio.gather(*[
query_shard(i) for i in range(SHARD_COUNT)
])
# Merge + sort
all_orders = [o for shard_result in results for o in shard_result]
all_orders.sort(key=lambda x: x['created_at'], reverse=True)
return all_orders[:100]
Global index — separate mapping table in dedicated database:
-- In separate "routing" database
CREATE TABLE order_shard_map (
order_id BIGINT PRIMARY KEY,
shard_id INT NOT NULL,
user_id BIGINT NOT NULL
);
CREATE INDEX ON order_shard_map (user_id);
When creating order — record mapping. When searching by order_id — find shard first, then query specific.
Resharding
Adding new shard is painful without Citus. Consistent hashing with virtual nodes (vnodes) minimizes data movement:
Instead of hash(key) % N
Use: find nearest vnode on ring of 150 virtual nodes
On shard addition: ~1/N data moves, not 1-(1/N)
Citus handles this automatically via citus_rebalance_start().
Timelines
Setting up PostgreSQL partitioning for existing table: 1–2 days. Installing and configuring Citus for new project: 2–3 days. Implementing application-level sharding with scatter-gather and global index: 3–5 days.







