RabbitMQ Exchange and Queue setup Direct Fanout Topic

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
RabbitMQ Exchange and Queue setup Direct Fanout Topic
Medium
~2-3 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Setting Up RabbitMQ Exchange and Queue (Direct, Fanout, Topic)

RabbitMQ does not deliver messages directly to queues — an exchange sits between the producer and the queue. The exchange type determines where the message goes. Choosing the correct type reduces routing complexity in code.

Four Exchange Types

Direct — exact routing key match. A message with key order.created goes to queues bound with that same key. One-to-one or one-to-many (multiple queues with one key).

Fanout — routing key is ignored, message is broadcast to all bound queues. Broadcast: event "user logged in" needs to be received by logger, session service, and analytics.

Topic — routing key with wildcards. * matches one word, # matches zero or more words. order.*.created matches order.express.created and order.regular.created. order.# matches any key starting with order..

Headers — routing by AMQP headers, routing key is ignored. Rarely used, when route is determined by multiple attributes.

Creation via CLI

# Direct exchange
rabbitmqadmin declare exchange \
    name=orders \
    type=direct \
    durable=true \
    auto_delete=false

# Fanout exchange
rabbitmqadmin declare exchange \
    name=events-broadcast \
    type=fanout \
    durable=true

# Topic exchange
rabbitmqadmin declare exchange \
    name=app-events \
    type=topic \
    durable=true \
    arguments='{"alternate-exchange":"app-events-unrouted"}'

# Queues
rabbitmqadmin declare queue \
    name=order-processing \
    durable=true \
    arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"order-processing.failed","x-delivery-limit":5}'

rabbitmqadmin declare queue \
    name=order-notifications \
    durable=true \
    arguments='{"x-queue-type":"quorum"}'

# Bindings
rabbitmqadmin declare binding \
    source=orders \
    destination=order-processing \
    routing_key=order.created

rabbitmqadmin declare binding \
    source=app-events \
    destination=order-processing \
    routing_key="order.#"

rabbitmqadmin declare binding \
    source=app-events \
    destination=order-notifications \
    routing_key="order.*.created"

Creation via Management HTTP API

BASE="http://rabbit-1:15672/api"
AUTH="admin:password"

# Create topic exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/app-events" \
  -H "Content-Type: application/json" \
  -d '{
    "type": "topic",
    "durable": true,
    "auto_delete": false,
    "arguments": {
      "alternate-exchange": "app-events-unrouted"
    }
  }'

# Dead Letter Exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/dlx" \
  -H "Content-Type: application/json" \
  -d '{"type": "direct", "durable": true}'

# DLQ queue
curl -u $AUTH -X PUT "$BASE/queues/%2F/failed-messages" \
  -H "Content-Type: application/json" \
  -d '{
    "durable": true,
    "arguments": {
      "x-queue-type": "quorum",
      "x-message-ttl": 2592000000
    }
  }'

# Bind DLQ to DLX
curl -u $AUTH -X POST "$BASE/bindings/%2F/e/dlx/q/failed-messages" \
  -H "Content-Type: application/json" \
  -d '{"routing_key": "#"}'

PHP Producer via php-amqplib

use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class EventPublisher
{
    private AMQPLazyConnection $connection;
    private ?\AMQPChannel $channel = null;

    public function __construct(
        private readonly array $hosts, // [['host'=>'rabbit-1','port'=>5672,'user'=>'...','password'=>'...']]
    ) {}

    private function channel(): \AMQPChannel
    {
        if ($this->channel === null || !$this->channel->is_open()) {
            $this->connection = AMQPLazyConnection::create_connection($this->hosts, [
                'heartbeat' => 60,
                'connection_timeout' => 5,
                'read_write_timeout' => 10,
            ]);
            $this->channel = $this->connection->channel();
            // Delivery confirmation
            $this->channel->confirm_select();
        }
        return $this->channel;
    }

    public function publish(string $exchange, string $routingKey, array $payload): void
    {
        $channel = $this->channel();

        $message = new AMQPMessage(
            json_encode($payload, JSON_THROW_ON_ERROR),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'content_type'  => 'application/json',
                'message_id'    => (string) Str::uuid(),
                'timestamp'     => time(),
                'app_id'        => 'webapp',
                'headers'       => new AMQPTable([
                    'x-retry-count' => 0,
                    'x-source'      => 'api',
                ]),
            ]
        );

        $channel->basic_publish($message, $exchange, $routingKey);

        // Wait for broker confirmation
        $channel->wait_for_pending_acks(5.0);
    }
}

// Usage
$publisher->publish('app-events', 'order.express.created', [
    'order_id' => 12345,
    'user_id'  => 67890,
    'amount'   => 1999.99,
]);

PHP Consumer

class OrderConsumer
{
    public function consume(): void
    {
        $channel = $this->connection->channel();
        $channel->basic_qos(null, 20, null); // prefetch: no more than 20 messages without ack

        $channel->basic_consume(
            'order-processing',
            '',     // consumer tag (auto-generated)
            false,  // no_local
            false,  // no_ack — manual confirmation
            false,  // exclusive
            false,  // nowait
            function (AMQPMessage $message) {
                try {
                    $payload = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
                    $this->processOrder($payload);
                    $message->ack();
                } catch (\Throwable $e) {
                    // Requeue=false → message goes to DLX according to queue settings
                    $message->nack(false);
                    Log::error('Failed to process order', ['error' => $e->getMessage()]);
                }
            }
        );

        while ($channel->is_consuming()) {
            $channel->wait(null, false, 5.0); // timeout 5s for graceful shutdown
            if ($this->shouldStop()) break;
        }

        $channel->close();
    }
}

Python Client via pika

import pika
import json

credentials = pika.PlainCredentials('webapp', 'password')
parameters = pika.ConnectionParameters(
    host='rabbit-1',
    port=5672,
    credentials=credentials,
    heartbeat=60,
    blocked_connection_timeout=30,
)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Publish to topic exchange
channel.basic_publish(
    exchange='app-events',
    routing_key='user.premium.upgraded',
    body=json.dumps({'user_id': 42, 'plan': 'premium'}),
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent,
        content_type='application/json',
    )
)
connection.close()

Typical Routing Patterns

Work Queue — multiple workers read from one queue, RabbitMQ distributes round-robin. Direct exchange, one binding.

Pub/Sub — one event is received by all subscribers. Fanout exchange, each service has its own queue.

Routing — different events go to different queues. Direct or topic exchange.

RPC — request-response through queues. Producer creates a temporary queue, specifies it in reply_to, consumer responds there.

Timeline

Day 1 — design exchanges/queues/bindings schema for application business logic. Creation via Management UI or CLI.

Day 2 — integrate producers, configure confirm mode, graceful reconnect on connection loss. Integrate consumers with manual ack.

Day 3 — test routing, verify DLX, load testing with rabbitmq-perf-test.