Redis Pub/Sub Message Queue Setup for Web Application

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Redis Pub/Sub Message Queue Setup for Web Application
Medium
~2-3 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Redis Pub/Sub and Streams Message Queue Setup

Redis provides two mechanisms for asynchronous messaging: Pub/Sub — simple fire-and-forget without persistence, and Streams — persistent queue with consumer groups, similar to lightweight Kafka.

Redis Pub/Sub

Suitable for real-time notifications within application. Messages are not persisted — if subscriber is disconnected, message is lost.

// Laravel: publish via Redis Pub/Sub
use Illuminate\Support\Facades\Redis;

// Publisher
Redis::publish('user-notifications', json_encode([
    'user_id' => $userId,
    'type'    => 'order.shipped',
    'message' => 'Your order has been shipped',
]));

// Subscriber (console command)
class RedisSubscribeCommand extends Command
{
    protected $signature = 'redis:subscribe';

    public function handle(): void
    {
        Redis::subscribe(['user-notifications'], function (string $message) {
            $data = json_decode($message, true);
            broadcast(new UserNotificationEvent($data));  // → WebSocket
        });
    }
}

Redis Streams

Streams — the right choice for task queue on Redis. Messages are stored in the stream, consumer groups track progress, pending entries — unprocessed messages.

# Create stream and add message
XADD emails * user_id 123 email [email protected] template welcome

# Create consumer group
XGROUP CREATE emails email-workers $ MKSTREAM

# Read new messages (worker 1)
XREADGROUP GROUP email-workers worker-1 COUNT 10 BLOCK 5000 STREAMS emails >

# Acknowledge processing
XACK emails email-workers <message-id>

PHP: Redis Streams worker

use Illuminate\Support\Facades\Redis;

class RedisStreamWorker
{
    private string $stream = 'emails';
    private string $group = 'email-workers';
    private string $consumer;

    public function __construct()
    {
        $this->consumer = gethostname() . ':' . getmypid();
        $this->ensureGroup();
    }

    private function ensureGroup(): void
    {
        try {
            Redis::xgroup('CREATE', $this->stream, $this->group, '$', true);
        } catch (\Throwable) {
            // Group already exists
        }
    }

    public function run(): void
    {
        while (true) {
            // First process pending (unacknowledged from previous run)
            $pending = Redis::xreadgroup(
                $this->group, $this->consumer,
                [$this->stream => '0'],  // '0' = pending messages
                10
            );
            $this->processMessages($pending);

            // Then new messages
            $messages = Redis::xreadgroup(
                $this->group, $this->consumer,
                [$this->stream => '>'],  // '>' = only new
                10,
                5000  // block 5 seconds
            );
            $this->processMessages($messages);
        }
    }

    private function processMessages(?array $streams): void
    {
        if (!$streams) return;

        foreach ($streams[$this->stream] ?? [] as [$id, $fields]) {
            try {
                $this->handleEmail($fields);
                Redis::xack($this->stream, $this->group, $id);
            } catch (\Throwable $e) {
                Log::error('Stream message failed', ['id' => $id, 'error' => $e->getMessage()]);
                // Message remains in pending — will be reread on next run
            }
        }
    }

    private function handleEmail(array $fields): void
    {
        Mail::to($fields['email'])->send(new TemplateMail($fields['template'], $fields));
    }
}

Node.js: ioredis Streams

import Redis from 'ioredis';

const redis = new Redis({ host: 'redis', port: 6379 });
const STREAM = 'emails';
const GROUP = 'email-workers';
const CONSUMER = `worker-${process.pid}`;

async function startWorker(): Promise<void> {
  // Create group if doesn't exist
  try {
    await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
  } catch { /* group exists */ }

  while (true) {
    const messages = await redis.xreadgroup(
      'GROUP', GROUP, CONSUMER,
      'COUNT', '10',
      'BLOCK', '5000',
      'STREAMS', STREAM, '>'
    ) as [string, [string, string[]][]][] | null;

    if (!messages) continue;

    for (const [, entries] of messages) {
      for (const [id, fields] of entries) {
        const data = Object.fromEntries(
          fields.reduce((acc, val, i) => (i % 2 === 0 ? acc.push([val, fields[i+1]]) : acc, acc), [] as [string,string][])
        );

        try {
          await sendEmail(data);
          await redis.xack(STREAM, GROUP, id);
        } catch (err) {
          console.error('Email failed:', id, err);
        }
      }
    }
  }
}

Stream trimming and cleanup

# Trim stream to last 10000 messages
XTRIM emails MAXLEN ~ 10000

# Auto-trim on add
XADD emails MAXLEN ~ 100000 * user_id 123 template welcome

Redis mechanisms comparison

Feature Pub/Sub Streams Lists (LPUSH/BRPOP)
Persistence No Yes Yes
Consumer groups No Yes No
History replay No Yes No
Complexity Minimal Medium Minimal
Use case Real-time events Task queue Simple queue

Implementation timeline

Redis Streams worker for typical PHP/Node.js application (email, notifications): 1–2 days. With pending messages monitoring and alerts: 2–3 days.