Event sourcing implementation through message broker

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Event sourcing implementation through message broker
Complex
~1-2 weeks
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Implementing Event Sourcing Through Message Broker

Event Sourcing is storing system state not as a current snapshot, but as a sequence of events. Instead of UPDATE orders SET status='shipped' in DB, a record OrderShipped{orderId: 42, timestamp: ..., trackingCode: ...} is added. Current state is calculated by replaying all events.

A message broker (Kafka, EventStoreDB) serves as an Event Log — an immutable journal from which any past state can be recovered.

When Event Sourcing is Justified

Event Sourcing adds complexity. Justified when:

  • Full audit log is needed (fintech, healthcare, e-commerce)
  • Need to recreate past state at a specific moment
  • Multiple read models from one source (CQRS)
  • Undo/redo operations

Not justified for most CRUD applications without strict audit requirements.

Event Store Based on Kafka

Kafka is ideal as Event Store: topics are append-only logs, events are ordered within partition, retention is configurable up to log.retention.ms=-1 (forever).

Topic schema:

  • orders-events — all order events (key = order_id, guarantees partition ordering)
  • users-events — user events
  • inventory-events — inventory movements
// Base domain event class
public abstract class DomainEvent {
    private final String eventId;
    private final String aggregateId;
    private final String aggregateType;
    private final long version;          // monotonically increasing aggregate version
    private final Instant occurredAt;
    private final String causedBy;       // ID of command that caused event

    // events are immutable
}

// Concrete events
public class OrderCreated extends DomainEvent {
    private final Long userId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    private final String currency;
}

public class OrderPaid extends DomainEvent {
    private final String paymentId;
    private final String paymentMethod;
    private final BigDecimal amount;
}

public class OrderShipped extends DomainEvent {
    private final String carrier;
    private final String trackingCode;
    private final Instant estimatedDelivery;
}

public class OrderCancelled extends DomainEvent {
    private final String reason;
    private final String cancelledBy; // "customer" | "system" | "support"
}

Aggregate with Event Sourcing

public class Order {
    private Long id;
    private Long userId;
    private OrderStatus status;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private long version = 0;

    private final List<DomainEvent> pendingEvents = new ArrayList<>();

    // Static method for reconstruction from events
    public static Order reconstitute(List<DomainEvent> events) {
        Order order = new Order();
        for (DomainEvent event : events) {
            order.apply(event);
        }
        return order;
    }

    // Command: create order
    public void create(Long userId, List<OrderItem> items) {
        if (this.status != null) throw new IllegalStateException("Order already exists");

        BigDecimal total = items.stream()
            .map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        OrderCreated event = new OrderCreated(
            UUID.randomUUID().toString(),
            String.valueOf(id),
            "Order",
            version + 1,
            Instant.now(),
            userId,
            items,
            total,
            "USD"
        );

        apply(event);
        pendingEvents.add(event);
    }

    public void pay(String paymentId, String method, BigDecimal amount) {
        if (status != OrderStatus.CREATED) {
            throw new InvalidOrderStateException("Cannot pay order in status: " + status);
        }

        OrderPaid event = new OrderPaid(
            UUID.randomUUID().toString(),
            String.valueOf(id),
            "Order",
            version + 1,
            Instant.now(),
            paymentId,
            method,
            amount
        );

        apply(event);
        pendingEvents.add(event);
    }

    // apply — mutates state without side effects
    private void apply(DomainEvent event) {
        version = event.getVersion();

        if (event instanceof OrderCreated e) {
            this.userId = e.getUserId();
            this.items = e.getItems();
            this.totalAmount = e.getTotalAmount();
            this.status = OrderStatus.CREATED;
        } else if (event instanceof OrderPaid) {
            this.status = OrderStatus.PAID;
        } else if (event instanceof OrderShipped e) {
            this.status = OrderStatus.SHIPPED;
        } else if (event instanceof OrderCancelled) {
            this.status = OrderStatus.CANCELLED;
        }
    }

    public List<DomainEvent> pullPendingEvents() {
        List<DomainEvent> events = new ArrayList<>(pendingEvents);
        pendingEvents.clear();
        return events;
    }
}

Event Store Repository

@Repository
public class OrderEventStoreRepository {
    private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
    private final KafkaConsumer<String, DomainEvent> replayConsumer;
    private static final String TOPIC = "order-events";

    public void save(Order order) {
        List<DomainEvent> events = order.pullPendingEvents();
        if (events.isEmpty()) return;

        // Optimistic locking — event version in header
        for (DomainEvent event : events) {
            Headers headers = new RecordHeaders();
            headers.add("aggregate-version", String.valueOf(event.getVersion()).getBytes());
            headers.add("event-type", event.getClass().getSimpleName().getBytes());

            ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(
                TOPIC,
                null,
                order.getId().toString(),
                event,
                headers
            );

            // Synchronous send for write guarantee
            kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
        }
    }

    public Order load(Long orderId) {
        // Replay all events for aggregate
        List<DomainEvent> events = replayEvents(TOPIC, orderId.toString());
        if (events.isEmpty()) throw new OrderNotFoundException(orderId);
        return Order.reconstitute(events);
    }

    private List<DomainEvent> replayEvents(String topic, String aggregateId) {
        // Read all partitions and filter by aggregate key
        // In production: use separate topic per-aggregate
        // or EventStoreDB instead of Kafka for better aggregate ID lookup support
        List<DomainEvent> events = new ArrayList<>();
        // ... implementation of key-based reading
        return events;
    }
}

Projections and Read Models

Read Models are built from Event Stream — denormalized views for specific queries:

@Component
public class OrderReadModelProjection {

    @Autowired
    private OrderReadModelRepository readRepo;

    @KafkaListener(topics = "order-events", groupId = "order-read-model-projector")
    public void project(ConsumerRecord<String, DomainEvent> record) {
        DomainEvent event = record.value();

        switch (event) {
            case OrderCreated e -> {
                OrderReadModel model = new OrderReadModel();
                model.setOrderId(Long.parseLong(e.getAggregateId()));
                model.setUserId(e.getUserId());
                model.setStatus("CREATED");
                model.setTotalAmount(e.getTotalAmount());
                model.setItemCount(e.getItems().size());
                model.setCreatedAt(e.getOccurredAt());
                readRepo.save(model);
            }
            case OrderPaid e -> readRepo.updateStatus(
                Long.parseLong(e.getAggregateId()), "PAID", e.getOccurredAt()
            );
            case OrderShipped e -> readRepo.updateStatusWithTracking(
                Long.parseLong(e.getAggregateId()), "SHIPPED",
                e.getTrackingCode(), e.getEstimatedDelivery()
            );
            case OrderCancelled e -> readRepo.updateStatus(
                Long.parseLong(e.getAggregateId()), "CANCELLED", e.getOccurredAt()
            );
            default -> {}
        }
    }
}

Snapshots — Replay Optimization

With thousands of events per aggregate, replay from start takes time. Snapshots save state at a specific version:

@Service
public class SnapshotService {

    public void createSnapshotIfNeeded(Order order) {
        if (order.getVersion() % 100 == 0) { // every 100 events
            OrderSnapshot snapshot = new OrderSnapshot(
                order.getId(),
                order.getVersion(),
                objectMapper.writeValueAsString(order),
                Instant.now()
            );
            snapshotRepo.save(snapshot);
        }
    }

    public Order loadWithSnapshot(Long orderId) {
        Optional<OrderSnapshot> snapshot = snapshotRepo.findLatest(orderId);

        if (snapshot.isPresent()) {
            Order order = objectMapper.readValue(snapshot.get().getState(), Order.class);
            // Load only events after snapshot version
            List<DomainEvent> newEvents = eventRepo.loadAfterVersion(
                orderId, snapshot.get().getVersion()
            );
            for (DomainEvent event : newEvents) {
                order.applyHistorical(event);
            }
            return order;
        }

        return eventRepo.load(orderId);
    }
}

Timeline

Day 1–2 — design event model: which events, their fields, versioning. Avro schemas or Protobuf for each event type.

Day 3–4 — develop aggregates and Event Store Repository, implement apply methods, optimistic locking.

Day 5–6 — implement projections for Read Models, test replay.

Day 7 — snapshots, load testing, evaluate replay time without snapshots vs with snapshots.

Day 8 — monitor projection lag, tool for recreating Read Model from Event Log (full replay).