Setting Up RabbitMQ Exchange and Queue (Direct, Fanout, Topic)
RabbitMQ does not deliver messages directly to queues — an exchange sits between the producer and the queue. The exchange type determines where the message goes. Choosing the correct type reduces routing complexity in code.
Four Exchange Types
Direct — exact routing key match. A message with key order.created goes to queues bound with that same key. One-to-one or one-to-many (multiple queues with one key).
Fanout — routing key is ignored, message is broadcast to all bound queues. Broadcast: event "user logged in" needs to be received by logger, session service, and analytics.
Topic — routing key with wildcards. * matches one word, # matches zero or more words. order.*.created matches order.express.created and order.regular.created. order.# matches any key starting with order..
Headers — routing by AMQP headers, routing key is ignored. Rarely used, when route is determined by multiple attributes.
Creation via CLI
# Direct exchange
rabbitmqadmin declare exchange \
name=orders \
type=direct \
durable=true \
auto_delete=false
# Fanout exchange
rabbitmqadmin declare exchange \
name=events-broadcast \
type=fanout \
durable=true
# Topic exchange
rabbitmqadmin declare exchange \
name=app-events \
type=topic \
durable=true \
arguments='{"alternate-exchange":"app-events-unrouted"}'
# Queues
rabbitmqadmin declare queue \
name=order-processing \
durable=true \
arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"order-processing.failed","x-delivery-limit":5}'
rabbitmqadmin declare queue \
name=order-notifications \
durable=true \
arguments='{"x-queue-type":"quorum"}'
# Bindings
rabbitmqadmin declare binding \
source=orders \
destination=order-processing \
routing_key=order.created
rabbitmqadmin declare binding \
source=app-events \
destination=order-processing \
routing_key="order.#"
rabbitmqadmin declare binding \
source=app-events \
destination=order-notifications \
routing_key="order.*.created"
Creation via Management HTTP API
BASE="http://rabbit-1:15672/api"
AUTH="admin:password"
# Create topic exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/app-events" \
-H "Content-Type: application/json" \
-d '{
"type": "topic",
"durable": true,
"auto_delete": false,
"arguments": {
"alternate-exchange": "app-events-unrouted"
}
}'
# Dead Letter Exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/dlx" \
-H "Content-Type: application/json" \
-d '{"type": "direct", "durable": true}'
# DLQ queue
curl -u $AUTH -X PUT "$BASE/queues/%2F/failed-messages" \
-H "Content-Type: application/json" \
-d '{
"durable": true,
"arguments": {
"x-queue-type": "quorum",
"x-message-ttl": 2592000000
}
}'
# Bind DLQ to DLX
curl -u $AUTH -X POST "$BASE/bindings/%2F/e/dlx/q/failed-messages" \
-H "Content-Type: application/json" \
-d '{"routing_key": "#"}'
PHP Producer via php-amqplib
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class EventPublisher
{
private AMQPLazyConnection $connection;
private ?\AMQPChannel $channel = null;
public function __construct(
private readonly array $hosts, // [['host'=>'rabbit-1','port'=>5672,'user'=>'...','password'=>'...']]
) {}
private function channel(): \AMQPChannel
{
if ($this->channel === null || !$this->channel->is_open()) {
$this->connection = AMQPLazyConnection::create_connection($this->hosts, [
'heartbeat' => 60,
'connection_timeout' => 5,
'read_write_timeout' => 10,
]);
$this->channel = $this->connection->channel();
// Delivery confirmation
$this->channel->confirm_select();
}
return $this->channel;
}
public function publish(string $exchange, string $routingKey, array $payload): void
{
$channel = $this->channel();
$message = new AMQPMessage(
json_encode($payload, JSON_THROW_ON_ERROR),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'message_id' => (string) Str::uuid(),
'timestamp' => time(),
'app_id' => 'webapp',
'headers' => new AMQPTable([
'x-retry-count' => 0,
'x-source' => 'api',
]),
]
);
$channel->basic_publish($message, $exchange, $routingKey);
// Wait for broker confirmation
$channel->wait_for_pending_acks(5.0);
}
}
// Usage
$publisher->publish('app-events', 'order.express.created', [
'order_id' => 12345,
'user_id' => 67890,
'amount' => 1999.99,
]);
PHP Consumer
class OrderConsumer
{
public function consume(): void
{
$channel = $this->connection->channel();
$channel->basic_qos(null, 20, null); // prefetch: no more than 20 messages without ack
$channel->basic_consume(
'order-processing',
'', // consumer tag (auto-generated)
false, // no_local
false, // no_ack — manual confirmation
false, // exclusive
false, // nowait
function (AMQPMessage $message) {
try {
$payload = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
$this->processOrder($payload);
$message->ack();
} catch (\Throwable $e) {
// Requeue=false → message goes to DLX according to queue settings
$message->nack(false);
Log::error('Failed to process order', ['error' => $e->getMessage()]);
}
}
);
while ($channel->is_consuming()) {
$channel->wait(null, false, 5.0); // timeout 5s for graceful shutdown
if ($this->shouldStop()) break;
}
$channel->close();
}
}
Python Client via pika
import pika
import json
credentials = pika.PlainCredentials('webapp', 'password')
parameters = pika.ConnectionParameters(
host='rabbit-1',
port=5672,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=30,
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Publish to topic exchange
channel.basic_publish(
exchange='app-events',
routing_key='user.premium.upgraded',
body=json.dumps({'user_id': 42, 'plan': 'premium'}),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
content_type='application/json',
)
)
connection.close()
Typical Routing Patterns
Work Queue — multiple workers read from one queue, RabbitMQ distributes round-robin. Direct exchange, one binding.
Pub/Sub — one event is received by all subscribers. Fanout exchange, each service has its own queue.
Routing — different events go to different queues. Direct or topic exchange.
RPC — request-response through queues. Producer creates a temporary queue, specifies it in reply_to, consumer responds there.
Timeline
Day 1 — design exchanges/queues/bindings schema for application business logic. Creation via Management UI or CLI.
Day 2 — integrate producers, configure confirm mode, graceful reconnect on connection loss. Integrate consumers with manual ack.
Day 3 — test routing, verify DLX, load testing with rabbitmq-perf-test.







