Saga pattern implementation through message queues

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Saga pattern implementation through message queues
Complex
~5 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Implementing Saga Pattern Through Message Queues

Distributed transactions through two-phase commit (2PC) do not work in microservice architecture — they create synchronous dependencies and failure points. The Saga pattern solves this: a long transaction is split into a sequence of local transactions, each of which publishes an event for the next step. On failure, compensating transactions are executed in reverse order.

Two Saga Orchestration Approaches

Choreography (choreography) — each service reacts to events and publishes its own. No central coordinator. Good for simple scenarios with 2–3 steps. Hard to debug as participants increase.

Orchestration (orchestration) — dedicated Saga Orchestrator knows the entire scenario, sends commands to each service and waits for response. Easier to debug and monitor. Recommended for complex scenarios.

Example: Order Processing Saga

Scenario: CreateOrder → ReserveInventory → ProcessPayment → ShipOrder

On any step failure — compensation in reverse order.

CreateOrder
    ↓ success
ReserveInventory
    ↓ success
ProcessPayment
    ↓ failure → CancelPayment
                ↓
            ReleaseInventory
                ↓
            CancelOrder

Orchestration Saga Implementation (Java/Spring)

// Saga state is stored in DB — guarantees recovery after restart
@Entity
@Table(name = "order_sagas")
public class OrderSaga {
    @Id
    private String sagaId;
    private Long orderId;

    @Enumerated(EnumType.STRING)
    private SagaStatus status; // STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSING, COMPLETED, COMPENSATING, FAILED

    @Enumerated(EnumType.STRING)
    private SagaStep currentStep;

    private String failureReason;
    private int retryCount;

    @Column(columnDefinition = "jsonb")
    private String context; // JSON with data for compensation
}

@Service
@Transactional
public class OrderSagaOrchestrator {

    @Autowired
    private OrderSagaRepository sagaRepo;

    @Autowired
    private MessagePublisher publisher;

    public void startSaga(CreateOrderCommand command) {
        // Local transaction: create order + save Saga
        Order order = orderService.createDraft(command);

        OrderSaga saga = new OrderSaga();
        saga.setSagaId(UUID.randomUUID().toString());
        saga.setOrderId(order.getId());
        saga.setStatus(SagaStatus.STARTED);
        saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
        sagaRepo.save(saga);

        // Publish command for next step
        publisher.publish("inventory-commands", new ReserveInventoryCommand(
            saga.getSagaId(),
            order.getId(),
            command.getItems()
        ));
    }

    @KafkaListener(topics = "inventory-events")
    public void onInventoryEvent(InventoryEvent event) {
        OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId())
            .orElseThrow(() -> new IllegalStateException("Saga not found: " + event.getSagaId()));

        if (event.getType() == EventType.INVENTORY_RESERVED) {
            saga.setStatus(SagaStatus.INVENTORY_RESERVED);
            saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
            sagaRepo.save(saga);

            publisher.publish("payment-commands", new ProcessPaymentCommand(
                saga.getSagaId(),
                saga.getOrderId(),
                event.getReservationId()
            ));
        } else if (event.getType() == EventType.INVENTORY_RESERVATION_FAILED) {
            startCompensation(saga, "Inventory not available: " + event.getReason());
        }
    }

    @KafkaListener(topics = "payment-events")
    public void onPaymentEvent(PaymentEvent event) {
        OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId()).orElseThrow();

        if (event.getType() == EventType.PAYMENT_COMPLETED) {
            saga.setStatus(SagaStatus.COMPLETED);
            saga.setCurrentStep(null);
            sagaRepo.save(saga);

            orderService.confirmOrder(saga.getOrderId());
            publisher.publish("shipping-commands", new CreateShipmentCommand(
                saga.getSagaId(), saga.getOrderId()
            ));
        } else if (event.getType() == EventType.PAYMENT_FAILED) {
            startCompensation(saga, "Payment failed: " + event.getErrorCode());
        }
    }

    private void startCompensation(OrderSaga saga, String reason) {
        saga.setStatus(SagaStatus.COMPENSATING);
        saga.setFailureReason(reason);
        sagaRepo.save(saga);

        // Determine which steps need compensation based on currentStep
        switch (saga.getCurrentStep()) {
            case PROCESS_PAYMENT:
                // Inventory reserved, payment failed
                publisher.publish("inventory-commands", new ReleaseInventoryCommand(
                    saga.getSagaId(), saga.getOrderId()
                ));
                break;
            case RESERVE_INVENTORY:
                // Failed to reserve — only cancel order
                orderService.cancelOrder(saga.getOrderId(), reason);
                saga.setStatus(SagaStatus.FAILED);
                sagaRepo.save(saga);
                break;
        }
    }
}

Idempotency — Protection from Duplicates

Each Saga step must be idempotent: repeated command should not create duplicate transaction.

@Service
public class InventoryService {

    public void reserveInventory(ReserveInventoryCommand command) {
        // Check: maybe we already reserved for this Saga?
        Optional<InventoryReservation> existing =
            reservationRepo.findBySagaId(command.getSagaId());

        if (existing.isPresent()) {
            // Idempotent response — publish same event
            publisher.publish("inventory-events", new InventoryReservedEvent(
                command.getSagaId(),
                existing.get().getId()
            ));
            return;
        }

        // Execute reservation
        try {
            InventoryReservation reservation = performReservation(command);
            publisher.publish("inventory-events", new InventoryReservedEvent(
                command.getSagaId(), reservation.getId()
            ));
        } catch (InsufficientInventoryException e) {
            publisher.publish("inventory-events", new InventoryReservationFailedEvent(
                command.getSagaId(), e.getMessage()
            ));
        }
    }
}

Implementation via RabbitMQ

Alternative to Kafka — same logic through RabbitMQ with topic exchange:

# Python orchestrator
import pika
import json
import uuid
from enum import Enum

class SagaOrchestrator:
    def __init__(self, connection):
        self.channel = connection.channel()
        self.channel.exchange_declare('saga-commands', 'topic', durable=True)
        self.channel.exchange_declare('saga-events', 'topic', durable=True)

        # Queue for receiving responses from services
        result = self.channel.queue_declare('orchestrator-responses', durable=True)
        self.channel.queue_bind(result.method.queue, 'saga-events', '#')
        self.channel.basic_consume(
            result.method.queue,
            self.on_event,
            auto_ack=False
        )

    def start_order_saga(self, order_data: dict) -> str:
        saga_id = str(uuid.uuid4())

        # Save Saga in DB (psycopg2/SQLAlchemy)
        self.save_saga(saga_id, order_data['order_id'], 'STARTED', 'RESERVE_INVENTORY')

        # Send first command
        self.channel.basic_publish(
            exchange='saga-commands',
            routing_key='inventory.reserve',
            body=json.dumps({
                'saga_id': saga_id,
                'order_id': order_data['order_id'],
                'items': order_data['items'],
            }),
            properties=pika.BasicProperties(
                delivery_mode=2,
                message_id=str(uuid.uuid4()),
                correlation_id=saga_id,
            )
        )
        return saga_id

    def on_event(self, channel, method, properties, body):
        event = json.loads(body)
        saga = self.load_saga(event['saga_id'])

        if event['type'] == 'INVENTORY_RESERVED':
            self.proceed_to_payment(saga, event)
        elif event['type'] in ('INVENTORY_FAILED', 'PAYMENT_FAILED'):
            self.compensate(saga, event['reason'])

        channel.basic_ack(method.delivery_tag)

Saga Monitoring and Debugging

Without visibility into Saga state, debugging distributed transactions is extremely difficult.

-- Stuck Sagas — did not complete in 30 minutes
SELECT saga_id, order_id, status, current_step,
       created_at, NOW() - created_at AS age
FROM order_sagas
WHERE status NOT IN ('COMPLETED', 'FAILED')
  AND created_at < NOW() - INTERVAL '30 minutes'
ORDER BY created_at;

-- Statistics for the day
SELECT status, COUNT(*), AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_duration_sec
FROM order_sagas
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY status;

Alert on stuck Sagas:

- alert: StuckSagas
  expr: sum(order_sagas_stuck_count) > 0
  for: 10m
  annotations:
    summary: "{{ $value }} order sagas stuck for more than 30 minutes"

Timeline

Day 1–2 — Saga design: define steps, compensating actions, command and event format. DB schema for Saga state.

Day 3–4 — develop Orchestrator: event handlers, compensation logic, command idempotency.

Day 5 — integrate with all participating services, implement compensating methods in each service.

Day 6 — test failure scenarios: kill each service at each step, verify compensation correctness.

Day 7 — monitor Saga states, alerts on stuck transactions, tool for manual control (resume/compensate/retry).