Apache Kafka Message Queue Setup for Web Application

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Apache Kafka Message Queue Setup for Web Application
Complex
~5 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Apache Kafka Message Queue Setup

Kafka is a distributed event streaming platform. Unlike RabbitMQ, messages in Kafka are not deleted after processing — they are stored in a topic according to retention policy (e.g., 7 days or 100 GB). Multiple consumer groups can independently read the same topic with different offsets.

When to use Kafka vs RabbitMQ

Kafka is preferred for: event sourcing, analytics pipelines, audit logs, stream processing with replay. RabbitMQ — for task queues (email, SMS, PDF generation) with guaranteed exactly-once delivery.

Installation via Docker

# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper_data:/var/lib/zookeeper/data
      - zookeeper_log:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 168        # 7 days
      KAFKA_LOG_RETENTION_BYTES: 107374182400  # 100 GB
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
    volumes:
      - kafka_data:/var/lib/kafka/data
    ports:
      - "9092:9092"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on: [kafka]
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    ports:
      - "8080:8080"

volumes:
  zookeeper_data:
  zookeeper_log:
  kafka_data:

Creating topics

# Create topic with 6 partitions and 3 replicas (for cluster)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic user-events \
  --partitions 6 \
  --replication-factor 1 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

# View
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic user-events

PHP: rdkafka producer

use RdKafka\Producer;
use RdKafka\Conf;

class KafkaProducer
{
    private Producer $producer;

    public function __construct()
    {
        $conf = new Conf();
        $conf->set('bootstrap.servers', config('kafka.brokers'));
        $conf->set('security.protocol', 'PLAINTEXT');
        $conf->set('acks', 'all');              // acknowledgment from all replicas
        $conf->set('retries', '3');
        $conf->set('enable.idempotence', 'true'); // exactly once
        $conf->set('compression.type', 'snappy');

        $conf->setDrMsgCb(function ($kafka, $message) {
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                Log::error('Kafka delivery failed', [
                    'error' => $message->errstr(),
                    'topic' => $message->topic_name,
                ]);
            }
        });

        $this->producer = new Producer($conf);
    }

    public function publish(string $topic, string $key, array $payload): void
    {
        $rdTopic = $this->producer->newTopic($topic);
        $rdTopic->produce(
            partition: RD_KAFKA_PARTITION_UA,  // auto-select partition by key
            msgflags: 0,
            payload: json_encode($payload),
            key: $key,                          // same key → same partition → event order
        );
        $this->producer->poll(0);
    }

    public function flush(): void
    {
        $result = $this->producer->flush(10000);  // 10 second timeout
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Kafka flush failed: ' . rd_kafka_err2str($result));
        }
    }
}

// Usage
$producer->publish('user-events', (string) $user->id, [
    'event'     => 'user.registered',
    'user_id'   => $user->id,
    'email'     => $user->email,
    'timestamp' => now()->toIso8601String(),
]);
$producer->flush();

PHP: Consumer

use RdKafka\KafkaConsumer;
use RdKafka\Conf;

class UserEventConsumer
{
    public function run(): void
    {
        $conf = new Conf();
        $conf->set('group.id', 'user-events-analytics');
        $conf->set('bootstrap.servers', config('kafka.brokers'));
        $conf->set('enable.auto.commit', 'false');  // manual commit
        $conf->set('auto.offset.reset', 'earliest');
        $conf->set('session.timeout.ms', '45000');
        $conf->set('max.poll.interval.ms', '300000');

        $conf->setRebalanceCb(function ($kafka, $err, $partitions) {
            if ($err === RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
                $kafka->assign($partitions);
                Log::info('Partitions assigned', ['count' => count($partitions)]);
            } else {
                $kafka->assign(null);
            }
        });

        $consumer = new KafkaConsumer($conf);
        $consumer->subscribe(['user-events', 'order-events']);

        while (true) {
            $message = $consumer->consume(timeout_ms: 1000);

            if ($message === null) continue;
            if ($message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) continue;
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                Log::error('Kafka consume error', ['error' => $message->errstr()]);
                continue;
            }

            try {
                $payload = json_decode($message->payload, true);
                $this->handle($message->topic_name, $payload);
                $consumer->commit($message);  // commit only after successful processing
            } catch (\Throwable $e) {
                Log::error('Processing failed', [
                    'topic'  => $message->topic_name,
                    'offset' => $message->offset,
                    'error'  => $e->getMessage(),
                ]);
                // Don't commit — message will be reread
            }
        }
    }

    private function handle(string $topic, array $payload): void
    {
        match ($payload['event']) {
            'user.registered' => $this->onUserRegistered($payload),
            'user.deleted'    => $this->onUserDeleted($payload),
            default           => Log::debug('Unknown event', ['event' => $payload['event']]),
        };
    }
}

Node.js: kafkajs

import { Kafka, CompressionTypes } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'myapp-api',
  brokers: [process.env.KAFKA_BROKERS!],
  retry: {
    retries: 5,
    initialRetryTime: 300,
    factor: 0.2,
  },
});

// Producer
const producer = kafka.producer({
  allowAutoTopicCreation: false,
  idempotent: true,
  maxInFlightRequests: 5,
});

await producer.connect();

await producer.send({
  topic: 'user-events',
  compression: CompressionTypes.Snappy,
  messages: [{
    key: String(userId),
    value: JSON.stringify({ event: 'user.login', userId, ip, timestamp: Date.now() }),
    headers: { 'content-type': 'application/json' },
  }],
});

// Consumer
const consumer = kafka.consumer({ groupId: 'audit-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const payload = JSON.parse(message.value!.toString());

    await AuditLog.create({
      event: payload.event,
      userId: payload.userId,
      metadata: payload,
    });
  },
});

Kafka Streams for aggregation

// Java: count events for last 5 minutes
StreamsBuilder builder = new StreamsBuilder();

KStream<String, UserEvent> events = builder.stream("user-events");

events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count()
    .toStream()
    .to("user-activity-counts");

Schema Registry for data contract

# Avro schema for user-events
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.myapp.events",
  "fields": [
    {"name": "event", "type": "string"},
    {"name": "user_id", "type": "long"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

Schema Registry prevents incompatible message format changes between producer and consumer.

Monitoring

# Consumer group lag (offset from topic end)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group user-events-analytics \
  --describe

# Grafana: kafka_consumer_group_lag > 10000 → alert
# JMX Exporter → Prometheus → Grafana dashboard

Implementation timeline

Task Timeline
Kafka + basic producer/consumer PHP/Node.js 3–4 days
Schema Registry + Avro +2 days
Kafka Streams aggregation 3–5 days
3-node Kafka cluster in Kubernetes 4–5 days