Database Sharding Setup for Web Application

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Database Sharding Setup for Web Application
Complex
~1-2 weeks
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Database Sharding Setup for Web Applications

Sharding is needed when a single PostgreSQL server can't handle data volume or write load. This isn't the first optimization step — before it come indexes, partitioning, replication, and caching. But when tables grow to hundreds of millions of rows and concurrent writes number thousands per second — sharding becomes necessary.

Partitioning vs Sharding

Partitioning — splitting one table into physical parts within a single PostgreSQL instance. Sharding — distributing data across multiple independent servers.

Partitioning is simpler and often sufficient. Start with it:

-- Range partitioning by date (logs, events)
CREATE TABLE events (
    id         BIGSERIAL,
    user_id    BIGINT       NOT NULL,
    event_type VARCHAR(50)  NOT NULL,
    created_at TIMESTAMPTZ  NOT NULL,
    data       JSONB
) PARTITION BY RANGE (created_at);

CREATE TABLE events_2024_q1 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE events_2024_q2 PARTITION OF events
    FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

-- Hash partitioning for even distribution
CREATE TABLE user_sessions (
    id      BIGSERIAL,
    user_id BIGINT NOT NULL,
    token   VARCHAR(255) NOT NULL,
    data    JSONB
) PARTITION BY HASH (user_id);

CREATE TABLE user_sessions_0 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE user_sessions_1 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 4, REMAINDER 1);
-- etc. to REMAINDER 3

Sharding with Citus

Citus is a PostgreSQL extension that turns it into a distributed database. This is the least painful path to sharding for PostgreSQL projects.

# Docker Compose for local testing
docker run -e POSTGRES_PASSWORD=pass -p 5432:5432 citusdata/citus:12.1
-- Add workers
SELECT citus_add_node('worker1', 5432);
SELECT citus_add_node('worker2', 5432);

-- Create distributed table
CREATE TABLE orders (
    id         BIGSERIAL,
    tenant_id  INT          NOT NULL,
    user_id    BIGINT       NOT NULL,
    status     VARCHAR(20)  NOT NULL,
    total      DECIMAL(12,2),
    created_at TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    PRIMARY KEY (id, tenant_id)   -- partition key must be in PK
);

SELECT create_distributed_table('orders', 'tenant_id', shard_count => 32);

-- Table for colocation (JOIN by tenant_id will be local)
CREATE TABLE order_items (
    id         BIGSERIAL,
    tenant_id  INT    NOT NULL,
    order_id   BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    quantity   INT    NOT NULL,
    PRIMARY KEY (id, tenant_id)
);

SELECT create_distributed_table('order_items', 'tenant_id',
    colocate_with => 'orders');

-- Reference table: replicated to all workers
CREATE TABLE categories (id BIGSERIAL PRIMARY KEY, name VARCHAR(200));
SELECT create_reference_table('categories');

After this, queries filtered by tenant_id route to specific shard. JOIN between orders and order_items by tenant_id executes locally on worker.

Application-Level Sharding

When Citus unavailable or full control needed — implement sharding in application.

Shard key choice — main architectural decision. Good shard keys:

  • user_id — for user-centric apps
  • tenant_id — for multi-tenant SaaS
  • region — for geographically distributed data

Bad shard keys:

  • created_at — hot spot on latest shard
  • status — uneven distribution
  • UUID v4 — no locality, poor cache hit

Consistent hashing:

# sharding/router.py
import hashlib
from dataclasses import dataclass
from typing import Any

@dataclass
class ShardConfig:
    host: str
    port: int
    database: str

SHARDS: dict[int, ShardConfig] = {
    0: ShardConfig('db-shard-0', 5432, 'myapp_0'),
    1: ShardConfig('db-shard-1', 5432, 'myapp_1'),
    2: ShardConfig('db-shard-2', 5432, 'myapp_2'),
    3: ShardConfig('db-shard-3', 5432, 'myapp_3'),
}

SHARD_COUNT = len(SHARDS)

def get_shard_id(shard_key: Any) -> int:
    """Deterministic shard determination by key."""
    key_bytes = str(shard_key).encode('utf-8')
    hash_value = int(hashlib.md5(key_bytes).hexdigest(), 16)
    return hash_value % SHARD_COUNT

def get_shard_config(shard_key: Any) -> ShardConfig:
    return SHARDS[get_shard_id(shard_key)]

Connections to shards:

from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from functools import lru_cache

@lru_cache(maxsize=None)
def _get_engine(shard_id: int):
    cfg = SHARDS[shard_id]
    dsn = f"postgresql+psycopg2://user:pass@{cfg.host}:{cfg.port}/{cfg.database}"
    return create_engine(dsn, pool_size=5, max_overflow=10)

@contextmanager
def get_shard_session(shard_key):
    shard_id = get_shard_id(shard_key)
    Session = sessionmaker(bind=_get_engine(shard_id))
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Usage:
with get_shard_session(user_id=12345) as session:
    orders = session.query(Order).filter_by(user_id=12345).all()

Cross-Shard Queries

Queries across multiple shards — most complex part. Two approaches:

Scatter-gather — parallel query to all shards, merge at application level:

import asyncio
import asyncpg

async def get_all_orders_by_status(status: str) -> list[dict]:
    """Scatter-gather across all shards."""
    async def query_shard(shard_id: int) -> list[dict]:
        cfg = SHARDS[shard_id]
        conn = await asyncpg.connect(
            host=cfg.host, database=cfg.database,
            user='app', password='pass'
        )
        rows = await conn.fetch(
            "SELECT * FROM orders WHERE status = $1 ORDER BY created_at DESC LIMIT 100",
            status
        )
        await conn.close()
        return [dict(r) for r in rows]

    results = await asyncio.gather(*[
        query_shard(i) for i in range(SHARD_COUNT)
    ])

    # Merge + sort
    all_orders = [o for shard_result in results for o in shard_result]
    all_orders.sort(key=lambda x: x['created_at'], reverse=True)
    return all_orders[:100]

Global index — separate mapping table in dedicated database:

-- In separate "routing" database
CREATE TABLE order_shard_map (
    order_id  BIGINT  PRIMARY KEY,
    shard_id  INT     NOT NULL,
    user_id   BIGINT  NOT NULL
);
CREATE INDEX ON order_shard_map (user_id);

When creating order — record mapping. When searching by order_id — find shard first, then query specific.

Resharding

Adding new shard is painful without Citus. Consistent hashing with virtual nodes (vnodes) minimizes data movement:

Instead of hash(key) % N
Use: find nearest vnode on ring of 150 virtual nodes
On shard addition: ~1/N data moves, not 1-(1/N)

Citus handles this automatically via citus_rebalance_start().

Timelines

Setting up PostgreSQL partitioning for existing table: 1–2 days. Installing and configuring Citus for new project: 2–3 days. Implementing application-level sharding with scatter-gather and global index: 3–5 days.