Redis Pub/Sub and Streams Message Queue Setup
Redis provides two mechanisms for asynchronous messaging: Pub/Sub — simple fire-and-forget without persistence, and Streams — persistent queue with consumer groups, similar to lightweight Kafka.
Redis Pub/Sub
Suitable for real-time notifications within application. Messages are not persisted — if subscriber is disconnected, message is lost.
// Laravel: publish via Redis Pub/Sub
use Illuminate\Support\Facades\Redis;
// Publisher
Redis::publish('user-notifications', json_encode([
'user_id' => $userId,
'type' => 'order.shipped',
'message' => 'Your order has been shipped',
]));
// Subscriber (console command)
class RedisSubscribeCommand extends Command
{
protected $signature = 'redis:subscribe';
public function handle(): void
{
Redis::subscribe(['user-notifications'], function (string $message) {
$data = json_decode($message, true);
broadcast(new UserNotificationEvent($data)); // → WebSocket
});
}
}
Redis Streams
Streams — the right choice for task queue on Redis. Messages are stored in the stream, consumer groups track progress, pending entries — unprocessed messages.
# Create stream and add message
XADD emails * user_id 123 email [email protected] template welcome
# Create consumer group
XGROUP CREATE emails email-workers $ MKSTREAM
# Read new messages (worker 1)
XREADGROUP GROUP email-workers worker-1 COUNT 10 BLOCK 5000 STREAMS emails >
# Acknowledge processing
XACK emails email-workers <message-id>
PHP: Redis Streams worker
use Illuminate\Support\Facades\Redis;
class RedisStreamWorker
{
private string $stream = 'emails';
private string $group = 'email-workers';
private string $consumer;
public function __construct()
{
$this->consumer = gethostname() . ':' . getmypid();
$this->ensureGroup();
}
private function ensureGroup(): void
{
try {
Redis::xgroup('CREATE', $this->stream, $this->group, '$', true);
} catch (\Throwable) {
// Group already exists
}
}
public function run(): void
{
while (true) {
// First process pending (unacknowledged from previous run)
$pending = Redis::xreadgroup(
$this->group, $this->consumer,
[$this->stream => '0'], // '0' = pending messages
10
);
$this->processMessages($pending);
// Then new messages
$messages = Redis::xreadgroup(
$this->group, $this->consumer,
[$this->stream => '>'], // '>' = only new
10,
5000 // block 5 seconds
);
$this->processMessages($messages);
}
}
private function processMessages(?array $streams): void
{
if (!$streams) return;
foreach ($streams[$this->stream] ?? [] as [$id, $fields]) {
try {
$this->handleEmail($fields);
Redis::xack($this->stream, $this->group, $id);
} catch (\Throwable $e) {
Log::error('Stream message failed', ['id' => $id, 'error' => $e->getMessage()]);
// Message remains in pending — will be reread on next run
}
}
}
private function handleEmail(array $fields): void
{
Mail::to($fields['email'])->send(new TemplateMail($fields['template'], $fields));
}
}
Node.js: ioredis Streams
import Redis from 'ioredis';
const redis = new Redis({ host: 'redis', port: 6379 });
const STREAM = 'emails';
const GROUP = 'email-workers';
const CONSUMER = `worker-${process.pid}`;
async function startWorker(): Promise<void> {
// Create group if doesn't exist
try {
await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
} catch { /* group exists */ }
while (true) {
const messages = await redis.xreadgroup(
'GROUP', GROUP, CONSUMER,
'COUNT', '10',
'BLOCK', '5000',
'STREAMS', STREAM, '>'
) as [string, [string, string[]][]][] | null;
if (!messages) continue;
for (const [, entries] of messages) {
for (const [id, fields] of entries) {
const data = Object.fromEntries(
fields.reduce((acc, val, i) => (i % 2 === 0 ? acc.push([val, fields[i+1]]) : acc, acc), [] as [string,string][])
);
try {
await sendEmail(data);
await redis.xack(STREAM, GROUP, id);
} catch (err) {
console.error('Email failed:', id, err);
}
}
}
}
}
Stream trimming and cleanup
# Trim stream to last 10000 messages
XTRIM emails MAXLEN ~ 10000
# Auto-trim on add
XADD emails MAXLEN ~ 100000 * user_id 123 template welcome
Redis mechanisms comparison
| Feature | Pub/Sub | Streams | Lists (LPUSH/BRPOP) |
|---|---|---|---|
| Persistence | No | Yes | Yes |
| Consumer groups | No | Yes | No |
| History replay | No | Yes | No |
| Complexity | Minimal | Medium | Minimal |
| Use case | Real-time events | Task queue | Simple queue |
Implementation timeline
Redis Streams worker for typical PHP/Node.js application (email, notifications): 1–2 days. With pending messages monitoring and alerts: 2–3 days.







