RabbitMQ Message Queue Setup for Web Application

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
RabbitMQ Message Queue Setup for Web Application
Complex
~3-5 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

RabbitMQ Message Queue Setup

RabbitMQ is a message broker using AMQP protocol. It distributes tasks between system components: web application publishes tasks (emails, PDF generation, notifications), workers process them asynchronously. If a worker crashes, messages remain in the queue and will be processed after restart.

Installation via Docker

# docker-compose.yml
services:
  rabbitmq:
    image: rabbitmq:3.13-management-alpine
    environment:
      RABBITMQ_DEFAULT_USER: myapp
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_DEFAULT_VHOST: myapp
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # Management UI
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:

Key concepts

Exchange — entry point for publishing. Distributes messages to queues by rules:

  • direct — exact routing key match
  • topic — pattern with * and #
  • fanout — to all subscribed queues
  • headers — by message headers

Queue — buffer for storing messages before worker processing.

Binding — connection between exchange and queue with routing key.

Topology for web application

[App] → [myapp.exchange (topic)] → myapp.emails → [Email Worker]
                                 → myapp.notifications → [Push Worker]
                                 → myapp.reports → [Report Worker]

PHP: phpamqplib

// Publishing a message
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitMQPublisher
{
    private AMQPStreamConnection $connection;
    private \PhpAmqpLib\Channel\AMQPChannel $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            host: config('rabbitmq.host'),
            port: config('rabbitmq.port', 5672),
            user: config('rabbitmq.user'),
            password: config('rabbitmq.password'),
            vhost: config('rabbitmq.vhost', '/'),
        );
        $this->channel = $this->connection->channel();
        $this->setup();
    }

    private function setup(): void
    {
        // Durable exchange — survives broker restart
        $this->channel->exchange_declare(
            exchange: 'myapp.exchange',
            type: 'topic',
            durable: true,
            auto_delete: false,
        );

        // Dead Letter Queue for failed messages
        $this->channel->queue_declare(
            queue: 'myapp.dlq',
            durable: true,
            arguments: new AMQPTable(['x-queue-type' => 'classic'])
        );

        // Main queue with DLQ binding
        $this->channel->queue_declare(
            queue: 'myapp.emails',
            durable: true,
            arguments: new AMQPTable([
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => 'myapp.dlq',
                'x-message-ttl' => 86400000,  // 24 hours in ms
            ])
        );

        $this->channel->queue_bind('myapp.emails', 'myapp.exchange', 'emails.*');
        $this->channel->queue_bind('myapp.notifications', 'myapp.exchange', 'notifications.*');
    }

    public function publish(string $routingKey, array $payload): void
    {
        $message = new AMQPMessage(
            json_encode($payload),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'content_type'  => 'application/json',
                'message_id'    => Str::uuid()->toString(),
                'timestamp'     => time(),
            ]
        );

        $this->channel->basic_publish($message, 'myapp.exchange', $routingKey);
    }

    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// Usage
$publisher->publish('emails.welcome', [
    'user_id' => $user->id,
    'email'   => $user->email,
    'name'    => $user->name,
]);

PHP: Consumer worker

class EmailWorker
{
    public function run(): void
    {
        $channel = $this->getChannel();

        // Prefetch: don't take more than 1 message without acknowledgment
        $channel->basic_qos(prefetch_size: 0, prefetch_count: 1, global: false);

        $channel->basic_consume(
            queue: 'myapp.emails',
            consumer_tag: gethostname() . '.email',
            no_ack: false,  // manual acknowledgment required
            callback: [$this, 'handleEmail'],
        );

        while ($channel->is_consuming()) {
            $channel->wait(timeout: 60);
        }
    }

    public function handleEmail(AMQPMessage $message): void
    {
        try {
            $payload = json_decode($message->getBody(), true);

            Mail::to($payload['email'])->send(new WelcomeMail($payload));

            // Acknowledge successful processing
            $message->ack();

            Log::info('Email sent', ['user_id' => $payload['user_id']]);

        } catch (\Throwable $e) {
            Log::error('Email failed', ['error' => $e->getMessage()]);

            // Requeue only if this is first attempt
            $requeue = !$message->has('application_headers') ||
                       ($message->get('application_headers')->getNativeData()['x-death'] ?? null) === null;

            $message->nack(requeue: $requeue);
        }
    }
}

Node.js: amqplib

import amqp, { Connection, Channel } from 'amqplib';

class MessageBus {
  private connection!: Connection;
  private channel!: Channel;

  async connect(): Promise<void> {
    this.connection = await amqp.connect({
      hostname: process.env.RABBITMQ_HOST,
      port: 5672,
      username: process.env.RABBITMQ_USER,
      password: process.env.RABBITMQ_PASS,
      vhost: process.env.RABBITMQ_VHOST,
      heartbeat: 60,
    });

    this.channel = await this.connection.createChannel();
    await this.channel.prefetch(5);

    await this.channel.assertExchange('myapp.exchange', 'topic', { durable: true });
    await this.channel.assertQueue('myapp.notifications', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'myapp.dlq',
      },
    });
    await this.channel.bindQueue('myapp.notifications', 'myapp.exchange', 'notifications.*');
  }

  async publish(routingKey: string, payload: object): Promise<void> {
    const content = Buffer.from(JSON.stringify(payload));
    this.channel.publish('myapp.exchange', routingKey, content, {
      persistent: true,
      contentType: 'application/json',
      messageId: crypto.randomUUID(),
      timestamp: Math.floor(Date.now() / 1000),
    });
  }

  async consume(queue: string, handler: (payload: unknown) => Promise<void>): Promise<void> {
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const payload = JSON.parse(msg.content.toString());
        await handler(payload);
        this.channel.ack(msg);
      } catch (err) {
        console.error('Message processing failed:', err);
        this.channel.nack(msg, false, false);  // send to DLQ
      }
    });
  }
}

Laravel Queue with RabbitMQ

Using vladimir-yuldashev/laravel-queue-rabbitmq package:

QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_VHOST=myapp
RABBITMQ_LOGIN=myapp
RABBITMQ_PASSWORD=secret
RABBITMQ_QUEUE=myapp.jobs
// config/queue.php
'rabbitmq' => [
    'driver'   => 'rabbitmq',
    'queue'    => env('RABBITMQ_QUEUE', 'default'),
    'hosts'    => [[
        'host'     => env('RABBITMQ_HOST', '127.0.0.1'),
        'port'     => env('RABBITMQ_PORT', 5672),
        'user'     => env('RABBITMQ_LOGIN', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
        'vhost'    => env('RABBITMQ_VHOST', '/'),
    ]],
    'options' => [
        'queue' => [
            'exchange'      => 'myapp.exchange',
            'exchange_type' => 'topic',
            'exchange_routing_key' => 'jobs.*',
        ],
    ],
],
// Standard Laravel Job
class SendWelcomeEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries = 3;
    public int $backoff = 60;

    public function __construct(private User $user) {}

    public function handle(): void
    {
        Mail::to($this->user)->send(new WelcomeMail($this->user));
    }
}

// Publishing
SendWelcomeEmail::dispatch($user)->onQueue('myapp.emails');

Monitoring via Management API

# Number of messages in queue
curl -s -u myapp:password \
  "http://rabbitmq:15672/api/queues/myapp/myapp.emails" | \
  jq '.messages, .consumers'

# Alert: queue growing
# Grafana: rabbitmq_queue_messages > 1000 → Slack notification

Implementation timeline

Task Timeline
RabbitMQ + basic producer/consumer 2–3 days
Laravel Queue integration 1–2 days
Dead Letter Queue + monitoring +1–2 days
HA RabbitMQ cluster (3 nodes) 3–4 days