RabbitMQ Message Queue Setup
RabbitMQ is a message broker using AMQP protocol. It distributes tasks between system components: web application publishes tasks (emails, PDF generation, notifications), workers process them asynchronously. If a worker crashes, messages remain in the queue and will be processed after restart.
Installation via Docker
# docker-compose.yml
services:
rabbitmq:
image: rabbitmq:3.13-management-alpine
environment:
RABBITMQ_DEFAULT_USER: myapp
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_DEFAULT_VHOST: myapp
volumes:
- rabbitmq_data:/var/lib/rabbitmq
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
retries: 5
volumes:
rabbitmq_data:
Key concepts
Exchange — entry point for publishing. Distributes messages to queues by rules:
-
direct— exact routing key match -
topic— pattern with*and# -
fanout— to all subscribed queues -
headers— by message headers
Queue — buffer for storing messages before worker processing.
Binding — connection between exchange and queue with routing key.
Topology for web application
[App] → [myapp.exchange (topic)] → myapp.emails → [Email Worker]
→ myapp.notifications → [Push Worker]
→ myapp.reports → [Report Worker]
PHP: phpamqplib
// Publishing a message
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class RabbitMQPublisher
{
private AMQPStreamConnection $connection;
private \PhpAmqpLib\Channel\AMQPChannel $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
host: config('rabbitmq.host'),
port: config('rabbitmq.port', 5672),
user: config('rabbitmq.user'),
password: config('rabbitmq.password'),
vhost: config('rabbitmq.vhost', '/'),
);
$this->channel = $this->connection->channel();
$this->setup();
}
private function setup(): void
{
// Durable exchange — survives broker restart
$this->channel->exchange_declare(
exchange: 'myapp.exchange',
type: 'topic',
durable: true,
auto_delete: false,
);
// Dead Letter Queue for failed messages
$this->channel->queue_declare(
queue: 'myapp.dlq',
durable: true,
arguments: new AMQPTable(['x-queue-type' => 'classic'])
);
// Main queue with DLQ binding
$this->channel->queue_declare(
queue: 'myapp.emails',
durable: true,
arguments: new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'myapp.dlq',
'x-message-ttl' => 86400000, // 24 hours in ms
])
);
$this->channel->queue_bind('myapp.emails', 'myapp.exchange', 'emails.*');
$this->channel->queue_bind('myapp.notifications', 'myapp.exchange', 'notifications.*');
}
public function publish(string $routingKey, array $payload): void
{
$message = new AMQPMessage(
json_encode($payload),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'message_id' => Str::uuid()->toString(),
'timestamp' => time(),
]
);
$this->channel->basic_publish($message, 'myapp.exchange', $routingKey);
}
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
// Usage
$publisher->publish('emails.welcome', [
'user_id' => $user->id,
'email' => $user->email,
'name' => $user->name,
]);
PHP: Consumer worker
class EmailWorker
{
public function run(): void
{
$channel = $this->getChannel();
// Prefetch: don't take more than 1 message without acknowledgment
$channel->basic_qos(prefetch_size: 0, prefetch_count: 1, global: false);
$channel->basic_consume(
queue: 'myapp.emails',
consumer_tag: gethostname() . '.email',
no_ack: false, // manual acknowledgment required
callback: [$this, 'handleEmail'],
);
while ($channel->is_consuming()) {
$channel->wait(timeout: 60);
}
}
public function handleEmail(AMQPMessage $message): void
{
try {
$payload = json_decode($message->getBody(), true);
Mail::to($payload['email'])->send(new WelcomeMail($payload));
// Acknowledge successful processing
$message->ack();
Log::info('Email sent', ['user_id' => $payload['user_id']]);
} catch (\Throwable $e) {
Log::error('Email failed', ['error' => $e->getMessage()]);
// Requeue only if this is first attempt
$requeue = !$message->has('application_headers') ||
($message->get('application_headers')->getNativeData()['x-death'] ?? null) === null;
$message->nack(requeue: $requeue);
}
}
}
Node.js: amqplib
import amqp, { Connection, Channel } from 'amqplib';
class MessageBus {
private connection!: Connection;
private channel!: Channel;
async connect(): Promise<void> {
this.connection = await amqp.connect({
hostname: process.env.RABBITMQ_HOST,
port: 5672,
username: process.env.RABBITMQ_USER,
password: process.env.RABBITMQ_PASS,
vhost: process.env.RABBITMQ_VHOST,
heartbeat: 60,
});
this.channel = await this.connection.createChannel();
await this.channel.prefetch(5);
await this.channel.assertExchange('myapp.exchange', 'topic', { durable: true });
await this.channel.assertQueue('myapp.notifications', {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'myapp.dlq',
},
});
await this.channel.bindQueue('myapp.notifications', 'myapp.exchange', 'notifications.*');
}
async publish(routingKey: string, payload: object): Promise<void> {
const content = Buffer.from(JSON.stringify(payload));
this.channel.publish('myapp.exchange', routingKey, content, {
persistent: true,
contentType: 'application/json',
messageId: crypto.randomUUID(),
timestamp: Math.floor(Date.now() / 1000),
});
}
async consume(queue: string, handler: (payload: unknown) => Promise<void>): Promise<void> {
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const payload = JSON.parse(msg.content.toString());
await handler(payload);
this.channel.ack(msg);
} catch (err) {
console.error('Message processing failed:', err);
this.channel.nack(msg, false, false); // send to DLQ
}
});
}
}
Laravel Queue with RabbitMQ
Using vladimir-yuldashev/laravel-queue-rabbitmq package:
QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_VHOST=myapp
RABBITMQ_LOGIN=myapp
RABBITMQ_PASSWORD=secret
RABBITMQ_QUEUE=myapp.jobs
// config/queue.php
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'default'),
'hosts' => [[
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
]],
'options' => [
'queue' => [
'exchange' => 'myapp.exchange',
'exchange_type' => 'topic',
'exchange_routing_key' => 'jobs.*',
],
],
],
// Standard Laravel Job
class SendWelcomeEmail implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3;
public int $backoff = 60;
public function __construct(private User $user) {}
public function handle(): void
{
Mail::to($this->user)->send(new WelcomeMail($this->user));
}
}
// Publishing
SendWelcomeEmail::dispatch($user)->onQueue('myapp.emails');
Monitoring via Management API
# Number of messages in queue
curl -s -u myapp:password \
"http://rabbitmq:15672/api/queues/myapp/myapp.emails" | \
jq '.messages, .consumers'
# Alert: queue growing
# Grafana: rabbitmq_queue_messages > 1000 → Slack notification
Implementation timeline
| Task | Timeline |
|---|---|
| RabbitMQ + basic producer/consumer | 2–3 days |
| Laravel Queue integration | 1–2 days |
| Dead Letter Queue + monitoring | +1–2 days |
| HA RabbitMQ cluster (3 nodes) | 3–4 days |







