Implementing Saga Pattern Through Message Queues
Distributed transactions through two-phase commit (2PC) do not work in microservice architecture — they create synchronous dependencies and failure points. The Saga pattern solves this: a long transaction is split into a sequence of local transactions, each of which publishes an event for the next step. On failure, compensating transactions are executed in reverse order.
Two Saga Orchestration Approaches
Choreography (choreography) — each service reacts to events and publishes its own. No central coordinator. Good for simple scenarios with 2–3 steps. Hard to debug as participants increase.
Orchestration (orchestration) — dedicated Saga Orchestrator knows the entire scenario, sends commands to each service and waits for response. Easier to debug and monitor. Recommended for complex scenarios.
Example: Order Processing Saga
Scenario: CreateOrder → ReserveInventory → ProcessPayment → ShipOrder
On any step failure — compensation in reverse order.
CreateOrder
↓ success
ReserveInventory
↓ success
ProcessPayment
↓ failure → CancelPayment
↓
ReleaseInventory
↓
CancelOrder
Orchestration Saga Implementation (Java/Spring)
// Saga state is stored in DB — guarantees recovery after restart
@Entity
@Table(name = "order_sagas")
public class OrderSaga {
@Id
private String sagaId;
private Long orderId;
@Enumerated(EnumType.STRING)
private SagaStatus status; // STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSING, COMPLETED, COMPENSATING, FAILED
@Enumerated(EnumType.STRING)
private SagaStep currentStep;
private String failureReason;
private int retryCount;
@Column(columnDefinition = "jsonb")
private String context; // JSON with data for compensation
}
@Service
@Transactional
public class OrderSagaOrchestrator {
@Autowired
private OrderSagaRepository sagaRepo;
@Autowired
private MessagePublisher publisher;
public void startSaga(CreateOrderCommand command) {
// Local transaction: create order + save Saga
Order order = orderService.createDraft(command);
OrderSaga saga = new OrderSaga();
saga.setSagaId(UUID.randomUUID().toString());
saga.setOrderId(order.getId());
saga.setStatus(SagaStatus.STARTED);
saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
sagaRepo.save(saga);
// Publish command for next step
publisher.publish("inventory-commands", new ReserveInventoryCommand(
saga.getSagaId(),
order.getId(),
command.getItems()
));
}
@KafkaListener(topics = "inventory-events")
public void onInventoryEvent(InventoryEvent event) {
OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId())
.orElseThrow(() -> new IllegalStateException("Saga not found: " + event.getSagaId()));
if (event.getType() == EventType.INVENTORY_RESERVED) {
saga.setStatus(SagaStatus.INVENTORY_RESERVED);
saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
sagaRepo.save(saga);
publisher.publish("payment-commands", new ProcessPaymentCommand(
saga.getSagaId(),
saga.getOrderId(),
event.getReservationId()
));
} else if (event.getType() == EventType.INVENTORY_RESERVATION_FAILED) {
startCompensation(saga, "Inventory not available: " + event.getReason());
}
}
@KafkaListener(topics = "payment-events")
public void onPaymentEvent(PaymentEvent event) {
OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId()).orElseThrow();
if (event.getType() == EventType.PAYMENT_COMPLETED) {
saga.setStatus(SagaStatus.COMPLETED);
saga.setCurrentStep(null);
sagaRepo.save(saga);
orderService.confirmOrder(saga.getOrderId());
publisher.publish("shipping-commands", new CreateShipmentCommand(
saga.getSagaId(), saga.getOrderId()
));
} else if (event.getType() == EventType.PAYMENT_FAILED) {
startCompensation(saga, "Payment failed: " + event.getErrorCode());
}
}
private void startCompensation(OrderSaga saga, String reason) {
saga.setStatus(SagaStatus.COMPENSATING);
saga.setFailureReason(reason);
sagaRepo.save(saga);
// Determine which steps need compensation based on currentStep
switch (saga.getCurrentStep()) {
case PROCESS_PAYMENT:
// Inventory reserved, payment failed
publisher.publish("inventory-commands", new ReleaseInventoryCommand(
saga.getSagaId(), saga.getOrderId()
));
break;
case RESERVE_INVENTORY:
// Failed to reserve — only cancel order
orderService.cancelOrder(saga.getOrderId(), reason);
saga.setStatus(SagaStatus.FAILED);
sagaRepo.save(saga);
break;
}
}
}
Idempotency — Protection from Duplicates
Each Saga step must be idempotent: repeated command should not create duplicate transaction.
@Service
public class InventoryService {
public void reserveInventory(ReserveInventoryCommand command) {
// Check: maybe we already reserved for this Saga?
Optional<InventoryReservation> existing =
reservationRepo.findBySagaId(command.getSagaId());
if (existing.isPresent()) {
// Idempotent response — publish same event
publisher.publish("inventory-events", new InventoryReservedEvent(
command.getSagaId(),
existing.get().getId()
));
return;
}
// Execute reservation
try {
InventoryReservation reservation = performReservation(command);
publisher.publish("inventory-events", new InventoryReservedEvent(
command.getSagaId(), reservation.getId()
));
} catch (InsufficientInventoryException e) {
publisher.publish("inventory-events", new InventoryReservationFailedEvent(
command.getSagaId(), e.getMessage()
));
}
}
}
Implementation via RabbitMQ
Alternative to Kafka — same logic through RabbitMQ with topic exchange:
# Python orchestrator
import pika
import json
import uuid
from enum import Enum
class SagaOrchestrator:
def __init__(self, connection):
self.channel = connection.channel()
self.channel.exchange_declare('saga-commands', 'topic', durable=True)
self.channel.exchange_declare('saga-events', 'topic', durable=True)
# Queue for receiving responses from services
result = self.channel.queue_declare('orchestrator-responses', durable=True)
self.channel.queue_bind(result.method.queue, 'saga-events', '#')
self.channel.basic_consume(
result.method.queue,
self.on_event,
auto_ack=False
)
def start_order_saga(self, order_data: dict) -> str:
saga_id = str(uuid.uuid4())
# Save Saga in DB (psycopg2/SQLAlchemy)
self.save_saga(saga_id, order_data['order_id'], 'STARTED', 'RESERVE_INVENTORY')
# Send first command
self.channel.basic_publish(
exchange='saga-commands',
routing_key='inventory.reserve',
body=json.dumps({
'saga_id': saga_id,
'order_id': order_data['order_id'],
'items': order_data['items'],
}),
properties=pika.BasicProperties(
delivery_mode=2,
message_id=str(uuid.uuid4()),
correlation_id=saga_id,
)
)
return saga_id
def on_event(self, channel, method, properties, body):
event = json.loads(body)
saga = self.load_saga(event['saga_id'])
if event['type'] == 'INVENTORY_RESERVED':
self.proceed_to_payment(saga, event)
elif event['type'] in ('INVENTORY_FAILED', 'PAYMENT_FAILED'):
self.compensate(saga, event['reason'])
channel.basic_ack(method.delivery_tag)
Saga Monitoring and Debugging
Without visibility into Saga state, debugging distributed transactions is extremely difficult.
-- Stuck Sagas — did not complete in 30 minutes
SELECT saga_id, order_id, status, current_step,
created_at, NOW() - created_at AS age
FROM order_sagas
WHERE status NOT IN ('COMPLETED', 'FAILED')
AND created_at < NOW() - INTERVAL '30 minutes'
ORDER BY created_at;
-- Statistics for the day
SELECT status, COUNT(*), AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_duration_sec
FROM order_sagas
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY status;
Alert on stuck Sagas:
- alert: StuckSagas
expr: sum(order_sagas_stuck_count) > 0
for: 10m
annotations:
summary: "{{ $value }} order sagas stuck for more than 30 minutes"
Timeline
Day 1–2 — Saga design: define steps, compensating actions, command and event format. DB schema for Saga state.
Day 3–4 — develop Orchestrator: event handlers, compensation logic, command idempotency.
Day 5 — integrate with all participating services, implement compensating methods in each service.
Day 6 — test failure scenarios: kill each service at each step, verify compensation correctness.
Day 7 — monitor Saga states, alerts on stuck transactions, tool for manual control (resume/compensate/retry).







