Event Sourcing Implementation for Web Application
Event Sourcing is a pattern for storing application state through sequence of immutable events instead of updated records. Current object state is recovered by "replaying" its event history. Not a silver bullet: adds complexity and justified for domains with rich change history, auditing, or temporal query needs.
When to Apply Event Sourcing
Fits:
- Financial transactions, accounting (need full history of each balance change)
- Order and logistics systems (OrderPlaced → PaymentProcessed → Shipped → Delivered)
- Medical records (each change must be documented)
- Systems with undo/redo or state rollback
Doesn't fit:
- CRUD directories without history
- Analytical stores (better to use CDC + data warehouse)
- Simple blogs and landing pages
Event Structure
interface DomainEvent {
eventId: string; // UUID
aggregateId: string; // ID of entity (orderId, userId)
aggregateType: string; // 'Order', 'Account'
eventType: string; // 'OrderPlaced', 'ItemAdded'
eventVersion: number; // for schema evolution
occurredAt: Date;
payload: Record<string, unknown>;
metadata: {
causedBy?: string; // parent event eventId
userId?: string;
correlationId: string;
};
}
Event Store
Main table — append-only. No UPDATE or DELETE:
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_version INT NOT NULL DEFAULT 1,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sequence_nr BIGINT NOT NULL -- global order
);
CREATE INDEX idx_es_aggregate ON event_store (aggregate_id, aggregate_type, id);
CREATE INDEX idx_es_sequence ON event_store (sequence_nr);
Optimistic locking — checking sequence_nr before writing new event prevents concurrent write conflicts.
Aggregate and Replay
class OrderAggregate {
private state: OrderState = { status: 'new', items: [], total: 0 };
private version = 0;
private uncommittedEvents: DomainEvent[] = [];
static rehydrate(events: DomainEvent[]): OrderAggregate {
const order = new OrderAggregate();
for (const event of events) {
order.apply(event);
}
return order;
}
placeOrder(items: OrderItem[]) {
// business rule validation
if (this.state.status !== 'new') throw new Error('Order already placed');
this.raise({
eventType: 'OrderPlaced',
payload: { items, placedAt: new Date() }
});
}
private apply(event: DomainEvent) {
switch (event.eventType) {
case 'OrderPlaced':
this.state.status = 'placed';
this.state.items = event.payload.items;
break;
case 'PaymentProcessed':
this.state.status = 'paid';
this.state.paidAmount = event.payload.amount;
break;
case 'OrderShipped':
this.state.status = 'shipped';
this.state.trackingNumber = event.payload.trackingNumber;
break;
}
this.version++;
}
}
Snapshots
With large event count per aggregate (>500) full replay becomes slow. Snapshot is serialized state at Nth event. On load, read latest snapshot + events after it:
async loadAggregate(aggregateId: string): Promise<OrderAggregate> {
const snapshot = await this.snapshotRepo.findLatest(aggregateId);
const fromSequence = snapshot?.version ?? 0;
const events = await this.eventStore.getEvents(
aggregateId, { fromVersion: fromSequence }
);
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot)
: new OrderAggregate();
return aggregate.rehydrate(events);
}
Snapshots created asynchronously every 100–500 events per aggregate.
Projections (Read Models)
Event Sourcing dictates split of Write Model (events) and Read Model (projections for queries). Projection subscribes to event stream and builds denormalized table for fast reading:
class OrderProjection {
async on(event: DomainEvent) {
switch (event.eventType) {
case 'OrderPlaced':
await db.query(`
INSERT INTO orders_view (id, status, customer_id, total, created_at)
VALUES ($1, 'placed', $2, $3, $4)
`, [event.aggregateId, event.payload.customerId,
event.payload.total, event.occurredAt]);
break;
case 'OrderShipped':
await db.query(`
UPDATE orders_view SET status = 'shipped',
tracking_number = $2, shipped_at = $3
WHERE id = $1
`, [event.aggregateId, event.payload.trackingNumber, event.occurredAt]);
break;
}
}
}
Projections can be deleted and rebuilt from scratch — event history is complete.
Technical Stack
Ready Event Stores:
- EventStoreDB — specialized DBMS, supports subscriptions, catchup subscriptions, projections
- Marten (.NET) — PostgreSQL as Event Store + document database
- Axon Framework (Java) — full ES/CQRS framework
Self-hosted on PostgreSQL — sufficient for most projects. LISTEN/NOTIFY for projections notification.
Event broker for distribution between services: Kafka, RabbitMQ, NATS JetStream.
Schema Evolution
Event schema versioning is mandatory practice. Strategies:
- Upcasting — when reading old event, transform to new schema
- Weak schema — JSON allows adding fields without breaking
-
Event versioning — store
eventVersion, read with different handlers
Time Complexity
| Operation | Without Snapshots | With Snapshots |
|---|---|---|
| Load aggregate (N events) | O(N) | O(recent events) |
| Write event | O(1) | O(1) |
| Query by state | O(N) projection rebuild | O(1) read model |
Implementation Timeline
- Basic Event Store on PostgreSQL with append-only table — 2–3 days
- One aggregate with several event types — 3–5 days
- Projections + subscriptions + snapshots — another 5–7 days
- Full system with multiple aggregates, schema evolution, monitoring — 3–5 weeks







