Apache Kafka Message Queue Setup
Kafka is a distributed event streaming platform. Unlike RabbitMQ, messages in Kafka are not deleted after processing — they are stored in a topic according to retention policy (e.g., 7 days or 100 GB). Multiple consumer groups can independently read the same topic with different offsets.
When to use Kafka vs RabbitMQ
Kafka is preferred for: event sourcing, analytics pipelines, audit logs, stream processing with replay. RabbitMQ — for task queues (email, SMS, PDF generation) with guaranteed exactly-once delivery.
Installation via Docker
# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper_data:/var/lib/zookeeper/data
- zookeeper_log:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 168 # 7 days
KAFKA_LOG_RETENTION_BYTES: 107374182400 # 100 GB
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
volumes:
- kafka_data:/var/lib/kafka/data
ports:
- "9092:9092"
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on: [kafka]
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
ports:
- "8080:8080"
volumes:
zookeeper_data:
zookeeper_log:
kafka_data:
Creating topics
# Create topic with 6 partitions and 3 replicas (for cluster)
kafka-topics.sh --bootstrap-server kafka:9092 \
--create \
--topic user-events \
--partitions 6 \
--replication-factor 1 \
--config retention.ms=604800000 \
--config cleanup.policy=delete
# View
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic user-events
PHP: rdkafka producer
use RdKafka\Producer;
use RdKafka\Conf;
class KafkaProducer
{
private Producer $producer;
public function __construct()
{
$conf = new Conf();
$conf->set('bootstrap.servers', config('kafka.brokers'));
$conf->set('security.protocol', 'PLAINTEXT');
$conf->set('acks', 'all'); // acknowledgment from all replicas
$conf->set('retries', '3');
$conf->set('enable.idempotence', 'true'); // exactly once
$conf->set('compression.type', 'snappy');
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
Log::error('Kafka delivery failed', [
'error' => $message->errstr(),
'topic' => $message->topic_name,
]);
}
});
$this->producer = new Producer($conf);
}
public function publish(string $topic, string $key, array $payload): void
{
$rdTopic = $this->producer->newTopic($topic);
$rdTopic->produce(
partition: RD_KAFKA_PARTITION_UA, // auto-select partition by key
msgflags: 0,
payload: json_encode($payload),
key: $key, // same key → same partition → event order
);
$this->producer->poll(0);
}
public function flush(): void
{
$result = $this->producer->flush(10000); // 10 second timeout
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Kafka flush failed: ' . rd_kafka_err2str($result));
}
}
}
// Usage
$producer->publish('user-events', (string) $user->id, [
'event' => 'user.registered',
'user_id' => $user->id,
'email' => $user->email,
'timestamp' => now()->toIso8601String(),
]);
$producer->flush();
PHP: Consumer
use RdKafka\KafkaConsumer;
use RdKafka\Conf;
class UserEventConsumer
{
public function run(): void
{
$conf = new Conf();
$conf->set('group.id', 'user-events-analytics');
$conf->set('bootstrap.servers', config('kafka.brokers'));
$conf->set('enable.auto.commit', 'false'); // manual commit
$conf->set('auto.offset.reset', 'earliest');
$conf->set('session.timeout.ms', '45000');
$conf->set('max.poll.interval.ms', '300000');
$conf->setRebalanceCb(function ($kafka, $err, $partitions) {
if ($err === RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
$kafka->assign($partitions);
Log::info('Partitions assigned', ['count' => count($partitions)]);
} else {
$kafka->assign(null);
}
});
$consumer = new KafkaConsumer($conf);
$consumer->subscribe(['user-events', 'order-events']);
while (true) {
$message = $consumer->consume(timeout_ms: 1000);
if ($message === null) continue;
if ($message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) continue;
if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
Log::error('Kafka consume error', ['error' => $message->errstr()]);
continue;
}
try {
$payload = json_decode($message->payload, true);
$this->handle($message->topic_name, $payload);
$consumer->commit($message); // commit only after successful processing
} catch (\Throwable $e) {
Log::error('Processing failed', [
'topic' => $message->topic_name,
'offset' => $message->offset,
'error' => $e->getMessage(),
]);
// Don't commit — message will be reread
}
}
}
private function handle(string $topic, array $payload): void
{
match ($payload['event']) {
'user.registered' => $this->onUserRegistered($payload),
'user.deleted' => $this->onUserDeleted($payload),
default => Log::debug('Unknown event', ['event' => $payload['event']]),
};
}
}
Node.js: kafkajs
import { Kafka, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({
clientId: 'myapp-api',
brokers: [process.env.KAFKA_BROKERS!],
retry: {
retries: 5,
initialRetryTime: 300,
factor: 0.2,
},
});
// Producer
const producer = kafka.producer({
allowAutoTopicCreation: false,
idempotent: true,
maxInFlightRequests: 5,
});
await producer.connect();
await producer.send({
topic: 'user-events',
compression: CompressionTypes.Snappy,
messages: [{
key: String(userId),
value: JSON.stringify({ event: 'user.login', userId, ip, timestamp: Date.now() }),
headers: { 'content-type': 'application/json' },
}],
});
// Consumer
const consumer = kafka.consumer({ groupId: 'audit-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const payload = JSON.parse(message.value!.toString());
await AuditLog.create({
event: payload.event,
userId: payload.userId,
metadata: payload,
});
},
});
Kafka Streams for aggregation
// Java: count events for last 5 minutes
StreamsBuilder builder = new StreamsBuilder();
KStream<String, UserEvent> events = builder.stream("user-events");
events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("user-activity-counts");
Schema Registry for data contract
# Avro schema for user-events
{
"type": "record",
"name": "UserEvent",
"namespace": "com.myapp.events",
"fields": [
{"name": "event", "type": "string"},
{"name": "user_id", "type": "long"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
]
}
Schema Registry prevents incompatible message format changes between producer and consumer.
Monitoring
# Consumer group lag (offset from topic end)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group user-events-analytics \
--describe
# Grafana: kafka_consumer_group_lag > 10000 → alert
# JMX Exporter → Prometheus → Grafana dashboard
Implementation timeline
| Task | Timeline |
|---|---|
| Kafka + basic producer/consumer PHP/Node.js | 3–4 days |
| Schema Registry + Avro | +2 days |
| Kafka Streams aggregation | 3–5 days |
| 3-node Kafka cluster in Kubernetes | 4–5 days |







