Kafka producers and consumers development for web application

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Kafka producers and consumers development for web application
Complex
~3-5 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Developing Kafka producers and consumers for web application

Kafka clients in production — it's not just "send message" and "receive message". It's delivery guarantees, idempotency, rebalance handling, offset management, and isolation from broker failures.

Producer: delivery guarantees

Three reliability modes (acks):

  • acks=0 — fire and forget, data loss possible
  • acks=1 — leader confirmed, replicas may not catch up
  • acks=all (or -1) — all ISR replicas confirmed, data loss impossible with min.insync.replicas=2

For financial transactions and critical events — only acks=all with enable.idempotence=true.

// Java — idempotent producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");

// Reliability
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max with idempotence
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);

// Performance
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);          // 64KB batch
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);               // wait up to 5ms for batch fill
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67_108_864);  // 64MB buffer
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);

Sending with error handling:

public CompletableFuture<RecordMetadata> sendOrderEvent(OrderEvent event) {
    ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
        "order-events",
        event.getOrderId(),  // key — all order events in one partition
        event
    );

    CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            if (exception instanceof RetriableException) {
                // Kafka retries automatically — this block shouldn't trigger with correct config
                log.error("Retriable error, Kafka will retry: {}", exception.getMessage());
            } else {
                // Non-retriable: Authorization, RecordTooLarge, SerializationException
                log.error("Fatal producer error for order {}: {}", event.getOrderId(), exception.getMessage());
                future.completeExceptionally(exception);
            }
        } else {
            log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
            future.complete(metadata);
        }
    });

    return future;
}

Transactional producer

When you need to atomically write to multiple topics (e.g., processing result + notification):

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-instance-1");
// transactional.id must be unique for each producer instance

producer.initTransactions();

try {
    producer.beginTransaction();

    producer.send(new ProducerRecord<>("orders-processed", orderId, processedOrder));
    producer.send(new ProducerRecord<>("order-notifications", userId, notification));

    // Commit consumer offsets within same transaction (exactly-once)
    producer.sendOffsetsToTransaction(currentOffsets, consumerGroupMetadata);

    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
    producer.close(); // This instance is no longer valid
    throw e;
} catch (KafkaException e) {
    producer.abortTransaction();
    throw e;
}

Consumer: correct offset management

Auto-commit hides errors: message marked as processed before application actually processes it. On failure — data loss.

Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://schema-registry:8081");
consumerProps.put("specific.avro.reader", true);

// Disable auto-commit
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Session timeout — if broker doesn't receive heartbeat within this time, considers consumer dead
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);

// How many records to fetch per poll
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Max time between polls — if exceeded, broker considers consumer dead
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);

Poll loop with manual commit:

KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("order-events"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Before rebalance — save processing state
        commitCurrentOffsets();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // After rebalance — initialize state for new partitions
        log.info("Assigned partitions: {}", partitions);
    }
});

Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();

try {
    while (!shutdown.get()) {
        ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, OrderEvent> record : records) {
            try {
                processOrder(record.value());

                pendingOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            } catch (NonRetriableException e) {
                // Send to DLQ and commit
                sendToDlq(record, e);
                pendingOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            }
            // RetriableException — don't commit, poll returns this message again
        }

        if (!pendingOffsets.isEmpty()) {
            consumer.commitSync(pendingOffsets);
            pendingOffsets.clear();
        }
    }
} finally {
    consumer.close();
}

Parallel processing without losing order

One thread for polling + worker pool by key:

// Preserve event order for one order, parallelize across orders
Map<Integer, BlockingQueue<ConsumerRecord<String, OrderEvent>>> partitionQueues = new HashMap<>();
ExecutorService workers = Executors.newFixedThreadPool(12);

// Route records to queue by partition
for (ConsumerRecord<String, OrderEvent> record : records) {
    int partitionIndex = record.partition() % NUM_WORKERS;
    workerQueues.get(partitionIndex).offer(record);
}

// Each worker processes its queue sequentially
// → order within key is preserved

Python client (confluent-kafka-python)

from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json

# Producer
producer = Producer({
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'batch.size': 65536,
    'linger.ms': 5,
    'retries': 2147483647,
    'delivery.timeout.ms': 120000,
})

def delivery_report(err, msg):
    if err is not None:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer.produce(
    'user-events',
    key=str(user_id),
    value=json.dumps(event).encode('utf-8'),
    callback=delivery_report
)
producer.flush()  # wait for all pending messages

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'kafka-1:9092',
    'group.id': 'web-app-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 30000,
})

consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError.PARTITION_EOF:
                continue
            raise KafkaException(msg.error())

        event = json.loads(msg.value().decode('utf-8'))
        process_event(event)
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Timeline

Day 1 — design: define topics, message keys (for order guarantee), message format (Avro/JSON/Protobuf), consumer groups.

Day 2 — develop producers: serialization, acks/idempotence setup, integration with application business logic.

Day 3 — develop consumers: manual offset management, rebalance handling, Dead Letter Queue for failed messages.

Days 4–5 — testing: unit tests with EmbeddedKafka or Testcontainers, integration tests, load test with kafka-producer-perf-test, verify exactly-once semantics.

Day 6 — deployment, monitor consumer lag, set up alerts.