Developing Kafka producers and consumers for web application
Kafka clients in production — it's not just "send message" and "receive message". It's delivery guarantees, idempotency, rebalance handling, offset management, and isolation from broker failures.
Producer: delivery guarantees
Three reliability modes (acks):
-
acks=0— fire and forget, data loss possible -
acks=1— leader confirmed, replicas may not catch up -
acks=all(or-1) — all ISR replicas confirmed, data loss impossible withmin.insync.replicas=2
For financial transactions and critical events — only acks=all with enable.idempotence=true.
// Java — idempotent producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
// Reliability
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max with idempotence
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
// Performance
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB batch
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // wait up to 5ms for batch fill
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67_108_864); // 64MB buffer
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);
Sending with error handling:
public CompletableFuture<RecordMetadata> sendOrderEvent(OrderEvent event) {
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"order-events",
event.getOrderId(), // key — all order events in one partition
event
);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Kafka retries automatically — this block shouldn't trigger with correct config
log.error("Retriable error, Kafka will retry: {}", exception.getMessage());
} else {
// Non-retriable: Authorization, RecordTooLarge, SerializationException
log.error("Fatal producer error for order {}: {}", event.getOrderId(), exception.getMessage());
future.completeExceptionally(exception);
}
} else {
log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
future.complete(metadata);
}
});
return future;
}
Transactional producer
When you need to atomically write to multiple topics (e.g., processing result + notification):
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-instance-1");
// transactional.id must be unique for each producer instance
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders-processed", orderId, processedOrder));
producer.send(new ProducerRecord<>("order-notifications", userId, notification));
// Commit consumer offsets within same transaction (exactly-once)
producer.sendOffsetsToTransaction(currentOffsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
producer.close(); // This instance is no longer valid
throw e;
} catch (KafkaException e) {
producer.abortTransaction();
throw e;
}
Consumer: correct offset management
Auto-commit hides errors: message marked as processed before application actually processes it. On failure — data loss.
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://schema-registry:8081");
consumerProps.put("specific.avro.reader", true);
// Disable auto-commit
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Session timeout — if broker doesn't receive heartbeat within this time, considers consumer dead
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);
// How many records to fetch per poll
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Max time between polls — if exceeded, broker considers consumer dead
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);
Poll loop with manual commit:
KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("order-events"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Before rebalance — save processing state
commitCurrentOffsets();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// After rebalance — initialize state for new partitions
log.info("Assigned partitions: {}", partitions);
}
});
Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();
try {
while (!shutdown.get()) {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, OrderEvent> record : records) {
try {
processOrder(record.value());
pendingOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (NonRetriableException e) {
// Send to DLQ and commit
sendToDlq(record, e);
pendingOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// RetriableException — don't commit, poll returns this message again
}
if (!pendingOffsets.isEmpty()) {
consumer.commitSync(pendingOffsets);
pendingOffsets.clear();
}
}
} finally {
consumer.close();
}
Parallel processing without losing order
One thread for polling + worker pool by key:
// Preserve event order for one order, parallelize across orders
Map<Integer, BlockingQueue<ConsumerRecord<String, OrderEvent>>> partitionQueues = new HashMap<>();
ExecutorService workers = Executors.newFixedThreadPool(12);
// Route records to queue by partition
for (ConsumerRecord<String, OrderEvent> record : records) {
int partitionIndex = record.partition() % NUM_WORKERS;
workerQueues.get(partitionIndex).offer(record);
}
// Each worker processes its queue sequentially
// → order within key is preserved
Python client (confluent-kafka-python)
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json
# Producer
producer = Producer({
'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'batch.size': 65536,
'linger.ms': 5,
'retries': 2147483647,
'delivery.timeout.ms': 120000,
})
def delivery_report(err, msg):
if err is not None:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer.produce(
'user-events',
key=str(user_id),
value=json.dumps(event).encode('utf-8'),
callback=delivery_report
)
producer.flush() # wait for all pending messages
# Consumer
consumer = Consumer({
'bootstrap.servers': 'kafka-1:9092',
'group.id': 'web-app-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 30000,
})
consumer.subscribe(['user-events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError.PARTITION_EOF:
continue
raise KafkaException(msg.error())
event = json.loads(msg.value().decode('utf-8'))
process_event(event)
consumer.commit(asynchronous=False)
finally:
consumer.close()
Timeline
Day 1 — design: define topics, message keys (for order guarantee), message format (Avro/JSON/Protobuf), consumer groups.
Day 2 — develop producers: serialization, acks/idempotence setup, integration with application business logic.
Day 3 — develop consumers: manual offset management, rebalance handling, Dead Letter Queue for failed messages.
Days 4–5 — testing: unit tests with EmbeddedKafka or Testcontainers, integration tests, load test with kafka-producer-perf-test, verify exactly-once semantics.
Day 6 — deployment, monitor consumer lag, set up alerts.







