Implementing Event Sourcing Through Message Broker
Event Sourcing is storing system state not as a current snapshot, but as a sequence of events. Instead of UPDATE orders SET status='shipped' in DB, a record OrderShipped{orderId: 42, timestamp: ..., trackingCode: ...} is added. Current state is calculated by replaying all events.
A message broker (Kafka, EventStoreDB) serves as an Event Log — an immutable journal from which any past state can be recovered.
When Event Sourcing is Justified
Event Sourcing adds complexity. Justified when:
- Full audit log is needed (fintech, healthcare, e-commerce)
- Need to recreate past state at a specific moment
- Multiple read models from one source (CQRS)
- Undo/redo operations
Not justified for most CRUD applications without strict audit requirements.
Event Store Based on Kafka
Kafka is ideal as Event Store: topics are append-only logs, events are ordered within partition, retention is configurable up to log.retention.ms=-1 (forever).
Topic schema:
-
orders-events— all order events (key = order_id, guarantees partition ordering) -
users-events— user events -
inventory-events— inventory movements
// Base domain event class
public abstract class DomainEvent {
private final String eventId;
private final String aggregateId;
private final String aggregateType;
private final long version; // monotonically increasing aggregate version
private final Instant occurredAt;
private final String causedBy; // ID of command that caused event
// events are immutable
}
// Concrete events
public class OrderCreated extends DomainEvent {
private final Long userId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
private final String currency;
}
public class OrderPaid extends DomainEvent {
private final String paymentId;
private final String paymentMethod;
private final BigDecimal amount;
}
public class OrderShipped extends DomainEvent {
private final String carrier;
private final String trackingCode;
private final Instant estimatedDelivery;
}
public class OrderCancelled extends DomainEvent {
private final String reason;
private final String cancelledBy; // "customer" | "system" | "support"
}
Aggregate with Event Sourcing
public class Order {
private Long id;
private Long userId;
private OrderStatus status;
private List<OrderItem> items;
private BigDecimal totalAmount;
private long version = 0;
private final List<DomainEvent> pendingEvents = new ArrayList<>();
// Static method for reconstruction from events
public static Order reconstitute(List<DomainEvent> events) {
Order order = new Order();
for (DomainEvent event : events) {
order.apply(event);
}
return order;
}
// Command: create order
public void create(Long userId, List<OrderItem> items) {
if (this.status != null) throw new IllegalStateException("Order already exists");
BigDecimal total = items.stream()
.map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
OrderCreated event = new OrderCreated(
UUID.randomUUID().toString(),
String.valueOf(id),
"Order",
version + 1,
Instant.now(),
userId,
items,
total,
"USD"
);
apply(event);
pendingEvents.add(event);
}
public void pay(String paymentId, String method, BigDecimal amount) {
if (status != OrderStatus.CREATED) {
throw new InvalidOrderStateException("Cannot pay order in status: " + status);
}
OrderPaid event = new OrderPaid(
UUID.randomUUID().toString(),
String.valueOf(id),
"Order",
version + 1,
Instant.now(),
paymentId,
method,
amount
);
apply(event);
pendingEvents.add(event);
}
// apply — mutates state without side effects
private void apply(DomainEvent event) {
version = event.getVersion();
if (event instanceof OrderCreated e) {
this.userId = e.getUserId();
this.items = e.getItems();
this.totalAmount = e.getTotalAmount();
this.status = OrderStatus.CREATED;
} else if (event instanceof OrderPaid) {
this.status = OrderStatus.PAID;
} else if (event instanceof OrderShipped e) {
this.status = OrderStatus.SHIPPED;
} else if (event instanceof OrderCancelled) {
this.status = OrderStatus.CANCELLED;
}
}
public List<DomainEvent> pullPendingEvents() {
List<DomainEvent> events = new ArrayList<>(pendingEvents);
pendingEvents.clear();
return events;
}
}
Event Store Repository
@Repository
public class OrderEventStoreRepository {
private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
private final KafkaConsumer<String, DomainEvent> replayConsumer;
private static final String TOPIC = "order-events";
public void save(Order order) {
List<DomainEvent> events = order.pullPendingEvents();
if (events.isEmpty()) return;
// Optimistic locking — event version in header
for (DomainEvent event : events) {
Headers headers = new RecordHeaders();
headers.add("aggregate-version", String.valueOf(event.getVersion()).getBytes());
headers.add("event-type", event.getClass().getSimpleName().getBytes());
ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(
TOPIC,
null,
order.getId().toString(),
event,
headers
);
// Synchronous send for write guarantee
kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
}
}
public Order load(Long orderId) {
// Replay all events for aggregate
List<DomainEvent> events = replayEvents(TOPIC, orderId.toString());
if (events.isEmpty()) throw new OrderNotFoundException(orderId);
return Order.reconstitute(events);
}
private List<DomainEvent> replayEvents(String topic, String aggregateId) {
// Read all partitions and filter by aggregate key
// In production: use separate topic per-aggregate
// or EventStoreDB instead of Kafka for better aggregate ID lookup support
List<DomainEvent> events = new ArrayList<>();
// ... implementation of key-based reading
return events;
}
}
Projections and Read Models
Read Models are built from Event Stream — denormalized views for specific queries:
@Component
public class OrderReadModelProjection {
@Autowired
private OrderReadModelRepository readRepo;
@KafkaListener(topics = "order-events", groupId = "order-read-model-projector")
public void project(ConsumerRecord<String, DomainEvent> record) {
DomainEvent event = record.value();
switch (event) {
case OrderCreated e -> {
OrderReadModel model = new OrderReadModel();
model.setOrderId(Long.parseLong(e.getAggregateId()));
model.setUserId(e.getUserId());
model.setStatus("CREATED");
model.setTotalAmount(e.getTotalAmount());
model.setItemCount(e.getItems().size());
model.setCreatedAt(e.getOccurredAt());
readRepo.save(model);
}
case OrderPaid e -> readRepo.updateStatus(
Long.parseLong(e.getAggregateId()), "PAID", e.getOccurredAt()
);
case OrderShipped e -> readRepo.updateStatusWithTracking(
Long.parseLong(e.getAggregateId()), "SHIPPED",
e.getTrackingCode(), e.getEstimatedDelivery()
);
case OrderCancelled e -> readRepo.updateStatus(
Long.parseLong(e.getAggregateId()), "CANCELLED", e.getOccurredAt()
);
default -> {}
}
}
}
Snapshots — Replay Optimization
With thousands of events per aggregate, replay from start takes time. Snapshots save state at a specific version:
@Service
public class SnapshotService {
public void createSnapshotIfNeeded(Order order) {
if (order.getVersion() % 100 == 0) { // every 100 events
OrderSnapshot snapshot = new OrderSnapshot(
order.getId(),
order.getVersion(),
objectMapper.writeValueAsString(order),
Instant.now()
);
snapshotRepo.save(snapshot);
}
}
public Order loadWithSnapshot(Long orderId) {
Optional<OrderSnapshot> snapshot = snapshotRepo.findLatest(orderId);
if (snapshot.isPresent()) {
Order order = objectMapper.readValue(snapshot.get().getState(), Order.class);
// Load only events after snapshot version
List<DomainEvent> newEvents = eventRepo.loadAfterVersion(
orderId, snapshot.get().getVersion()
);
for (DomainEvent event : newEvents) {
order.applyHistorical(event);
}
return order;
}
return eventRepo.load(orderId);
}
}
Timeline
Day 1–2 — design event model: which events, their fields, versioning. Avro schemas or Protobuf for each event type.
Day 3–4 — develop aggregates and Event Store Repository, implement apply methods, optimistic locking.
Day 5–6 — implement projections for Read Models, test replay.
Day 7 — snapshots, load testing, evaluate replay time without snapshots vs with snapshots.
Day 8 — monitor projection lag, tool for recreating Read Model from Event Log (full replay).







