Kafka Streams setup for data stream processing

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Setting Up Kafka Streams for Data Stream Processing

Kafka Streams is a library for stream data processing that works as part of regular Java/Kotlin application. No separate processing cluster, no YARN or Mesos — just dependency in pom.xml and regular JVM process. This is principal difference from Flink and Spark Streaming where separate infrastructure needed.

Architecture Overview

Kafka Streams reads topics, transforms data, aggregates, joins and writes result back to Kafka or external systems through Kafka Connect. State stored locally in RocksDB and replicated to changelog topics in Kafka — this provides fault tolerance without external database.

Typical website tasks: aggregating user events in real time (DAU, funnels), enriching order stream with reference data, fraud detection by behavior patterns, deriving materialized views from event-sourced data.

Basic Topology

StreamsBuilder builder = new StreamsBuilder();

KStream<String, UserEvent> events = builder.stream(
    "user-events",
    Consumed.with(Serdes.String(), userEventSerde)
);

// Filtering + transformation
KStream<String, PageView> pageViews = events
    .filter((userId, event) -> event.getType().equals("PAGE_VIEW"))
    .mapValues(event -> PageView.from(event));

// Stream branching
Map<String, KStream<String, UserEvent>> branches = events.split(Named.as("branch-"))
    .branch((k, v) -> v.getType().equals("PURCHASE"), Branched.as("purchases"))
    .branch((k, v) -> v.getType().equals("CLICK"), Branched.as("clicks"))
    .defaultBranch(Branched.as("other"));

branches.get("branch-purchases").to("purchase-events");

Aggregations with Window Functions

Task — count page views per user in sliding 5-minute window:

KTable<Windowed<String>, Long> pageViewCounts = pageViews
    .groupByKey(Grouped.with(Serdes.String(), pageViewSerde))
    .windowedBy(
        SlidingWindows.ofTimeDifferenceAndGrace(
            Duration.ofMinutes(5),
            Duration.ofSeconds(30)  // grace period for late events
        )
    )
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("page-view-counts")
        .withValueSerde(Serdes.Long())
    );

// Publish results
pageViewCounts.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),
        new PageViewStat(windowedKey.key(), windowedKey.window().start(), count)
    ))
    .to("page-view-stats", Produced.with(Serdes.String(), pageViewStatSerde));

Window types:

  • TumblingWindows — fixed non-overlapping intervals (00:00–00:05, 00:05–00:10)
  • HoppingWindows — fixed size, sliding step
  • SlidingWindows — move on each event
  • SessionWindows — grouping by activity gaps

KTable and Materialized Views

KTable — changelog stream where each new record with same key overwrites previous. Used for reference data:

// Enriching event stream with user data
KTable<String, UserProfile> userProfiles = builder.table(
    "user-profiles",
    Materialized.as("user-profiles-store")
);

KStream<String, EnrichedEvent> enriched = events.join(
    userProfiles,
    (event, profile) -> EnrichedEvent.builder()
        .event(event)
        .userName(profile.getName())
        .userSegment(profile.getSegment())
        .build()
);

KTable join works synchronously — record from stream joined with current state of table at processing moment.

Serialization with Avro and Schema Registry

In production always need schema. Avro + Confluent Schema Registry — standard:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>7.5.0</version>
</dependency>
Map<String, Object> serdeConfig = Map.of(
    AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081",
    KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true
);

KafkaAvroSerde<UserEvent> userEventSerde = new KafkaAvroSerde<>();
userEventSerde.configure(serdeConfig, false);  // false = value serde

KStream<String, UserEvent> events = builder.stream(
    "user-events",
    Consumed.with(Serdes.String(), userEventSerde)
);

Schema Registry stores schema versions and ensures compatibility on evolution. Changing schema without registration breaks consumers.

Application Configuration

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "site-analytics-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// Performance
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

// Error handling
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);

// RocksDB state store
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Interactive Queries — Reading State

Kafka Streams allows reading state store without round-trip through Kafka:

ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "page-view-counts",
        QueryableStoreTypes.keyValueStore()
    )
);

// GET /api/stats/user/:userId
Long count = store.get(userId);

// For windowed store
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "page-view-counts-windowed",
        QueryableStoreTypes.windowStore()
    )
);
WindowStoreIterator<Long> iterator = windowStore.fetch(
    userId,
    Instant.now().minus(Duration.ofMinutes(5)),
    Instant.now()
);

With horizontal scaling state distributed between instances. Kafka Streams provides streamsMetadataForKey to determine which host holds needed key — foundation for REST proxy implementation to distributed state.

Monitoring and Metrics

Kafka Streams exports metrics through JMX. For Prometheus — JMX Exporter:

# docker-compose.yml
services:
  streams-app:
    image: my-streams-app:latest
    environment:
      JAVA_OPTS: >
        -javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent.jar=8080:/opt/jmx-exporter/config.yml
        -Xmx2g
        -XX:+UseG1GC

Key metrics:

  • kafka.streams:type=stream-metrics,client-id=* — application metrics
  • process-rate — records per second
  • process-latency-avg — average processing latency
  • commit-latency-avg — commit latency
  • rocksdb-block-cache-hit-ratio — RocksDB cache effectiveness

Dead Letter Queue

Deserialization and processing errors should be routed, not just logged:

// Custom error handler
public class DlqProductionExceptionHandler implements ProductionExceptionHandler {
    private final Producer<byte[], byte[]> dlqProducer;

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {
        dlqProducer.send(new ProducerRecord<>("dead-letter-queue", record.key(), record.value()));
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

Timeline

Setting up Kafka + Schema Registry + basic Streams topology — 3–4 days. Aggregations with windowed operations and Interactive Queries for REST API — another 3–5 days. Full pipeline with monitoring, DLQ, tests (TopologyTestDriver) and CI/CD — 2–3 weeks. Migrating existing analytics from batch processing to Streams — estimated separately after topics and schemas audit.