Kafka Streams & Stream Processing

Kafka Streams is a library—not a separate cluster—that turns your JVM app into a stream processor. You define a topology of operators over KStream and KTable abstractions; the runtime handles partitioning, state on disk, changelog recovery, and EOS transactions. This chapter goes from time semantics to scaling tasks across pods.

developer senior Kafka 3.x

Stream processing fundamentals

Before operators and joins, time semantics decide whether your hourly count includes late-arriving mobile events or double-counts replays. Kafka Streams defaults to event time when timestamps exist in records.

Event time vs processing time vs ingestion time

Time type Definition Use when
Event time When the event occurred in the real world (CREATE_TIME in record) Aggregations, windowing—correct despite network delay
Processing time When the stream processor handles the record Low-latency approximations; non-critical metrics
Ingestion time When broker appended record (LOG_APPEND_TIME) When producer clocks untrusted; broker becomes source of truth

Kafka Streams windowing uses event time from record timestamps by default (TimestampExtractor customizable). Late records outside grace are dropped for closed windows.

Windowing overview

  • Tumbling — fixed, non-overlapping buckets (e.g. counts per hour)
  • Hopping — fixed size, overlapping advance interval (5-min window every 1 min)
  • Sliding — event-time driven; window moves with each key’s activity
  • Session — gap-based; closes after inactivity period

Watermarks and late data

Kafka Streams does not use Flink-style watermarks explicitly—it uses grace period on windows: after window end, accept late events for grace duration, then close permanently. Records after grace are dropped (or sent to side output if you branch).

Stateless vs stateful operations

Type Examples State store?
Stateless filter, map, flatMap, branch No
Stateful aggregate, count, reduce, join, groupBy Yes — RocksDB + changelog
🎯 Interview Tip

“Why event time?” — processing time breaks when consumers lag or replay history; event time keeps window boundaries stable for billing and analytics. Mention grace for straggler events.

Kafka Streams architecture

Your application is a directed graph of processors. Sources read Kafka topics; stream processors transform; sinks write output topics. The runtime compiles DSL into this graph and assigns tasks to threads.

flowchart LR
  SRC[SourceProcessor\nread topic]
  SP[StreamProcessor\nmap/filter]
  SS[StateStore\nRocksDB]
  SNK[SinkProcessor\nwrite topic]
  SRC --> SP --> SNK
  SP <--> SS

KStream — event stream

Unbounded sequence where every record is an event—inserts, updates, deletes all appear as distinct facts. A price change produces a new event; prior event remains in the log (unless compacted away). Use for: raw clicks, orders, sensor readings, CDC events.

KTable — changelog table

Represents current state—latest value per key. Under the hood, consumes a topic (often compacted) and materializes key → value. Null value = tombstone = delete key from table. Use for: dimension tables, user profiles, inventory snapshots.

GlobalKTable — replicated lookup

Entire table replicated on every stream instance—no partition alignment required for joins. Loaded from a compacted topic with full consume on startup. Memory/disk cost scales with table size × instances. Use for: small reference data (country codes, product catalog < few GB).

Stream-table duality

Same compacted topic can be viewed as KStream (every changelog record as event) or KTable (collapsed to latest per key). Aggregations emit KTable; joining stream with table enriches events with dimension data without a database round-trip.

🔬 Under the Hood

KTable is backed by a changelog state store—every update writes to local RocksDB and a internal changelog topic for recovery. On restart, restore from changelog + consume input topic from checkpoint offset.

⚖️ Trade-off

GlobalKTable simplifies joins but does not scale to huge tables—every pod holds a full copy. Large dimensions: use partitioned KTable + co-partitioned join instead.

Topology building

The StreamsBuilder DSL compiles to a topology of nodes and edges. Repartition topics appear automatically when you groupBy on a key different from the source partition key.

Java — complete DSL topology
StreamsBuilder builder = new StreamsBuilder();

// Sources
KStream<String, OrderEvent> orders = builder.stream("orders",
    Consumed.with(Serdes.String(), orderSerde));
KTable<String, Customer> customers = builder.table("customers",
    Consumed.with(Serdes.String(), customerSerde));

// Stateless
KStream<String, OrderEvent> highValue = orders
    .filter((k, v) -> v.amount() > 1000)
    .mapValues(v -> v.withValidated(true))
    .flatMapValues(v -> v.lineItems());  // 1 order → N line items

// Stateful aggregation (tumbling window)
orders
    .groupByKey(Grouped.with(Serdes.String(), orderSerde))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .aggregate(
        OrderStats::empty,
        (key, order, agg) -> agg.add(order),
        Materialized.<String, OrderStats, WindowStore<Bytes, byte[]>>as("order-stats-store")
            .withValueSerde(orderStatsSerde))
    .toStream()
    .map((windowedKey, stats) -> KeyValue.pair(windowedKey.key(), stats))
    .to("order-stats-5min");

// KStream-KTable join (enrichment, no window)
KStream<String, EnrichedOrder> enriched = orders.join(
    customers,
    (order, customer) -> new EnrichedOrder(order, customer),
    Joined.with(Serdes.String(), orderSerde, customerSerde));

// KStream-KStream windowed join
KStream<String, PaymentEvent> payments = builder.stream("payments");
orders.join(
    payments,
    (order, payment) -> new MatchedPayment(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)),
    StreamJoined.with(Serdes.String(), orderSerde, paymentSerde));

KTable<String, Long> orderCounts = orders
    .groupByKey()
    .count(Materialized.as("order-counts"));

// Sink
enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedSerde));

KafkaStreams streams = new KafkaStreams(builder.build(), props);

DSL operators reference

Operator Input → Output Notes
filter / filterNotKStream → KStreamStateless drop
map / mapValuesKStream → KStreamTransform; map may change key → repartition
flatMap / flatMapValuesKStream → KStream1→N records
groupBy / groupByKeyKStream → KGroupedStreamTriggers repartition if key changed
windowedByKGroupedStream → KGroupedWindowedStreamRequired before windowed agg/join
aggregate / reduce / countGrouped → KTableStateful; creates store + changelog
joinStream-Table, Stream-Stream, Table-TableCo-partitioning rules apply

Join types

Join Window required? Partition requirement
KStream-KStream Yes — JoinWindows Same key, co-partitioned topics
KStream-KTable No Same key, co-partitioned
KTable-KTable No Co-partitioned
KStream-GlobalKTable No No co-partitioning—table on all nodes

Processor API and DSL bridge

Low-level Processor API: implement Processor / Context, schedule punctuators (wall-clock or stream-time callbacks), access state stores directly. Bridge from DSL: transform(), process(), transformValues() for custom logic that does not fit map/filter.

Java — transform with Processor API
orders.transformValues(
    () -> new ValueTransformerWithKey<String, OrderEvent, OrderEvent>() {
      private ProcessorContext context;

      @Override
      public void init(ProcessorContext context) {
        this.context = context;
        context.schedule(Duration.ofMinutes(1), PunctuationType.STREAM_TIME,
            ts -> log.info("Stream time advanced to {}", ts));
      }

      @Override
      public OrderEvent transform(String key, OrderEvent value) {
        return value.withProcessedAt(context.timestamp());
      }

      @Override public void close() {}
    },
    "transform-store-names"
);
⚠️ Pitfall

Implicit repartition topics appear on groupBy key changes—internal topics multiply. Inspect topology with topology.describe() before production; set replication.factor on internal topics.

State stores

Aggregations and joins need memory or disk to remember prior keys. Kafka Streams persists state locally and mirrors changes to changelog topics so tasks can move between instances without losing counts.

Why local state

  • Aggregations — running sum/count per key
  • Joins — KTable materialization holds probe side
  • Deduplication — custom processor tracks seen IDs in store
  • Window stores — retain per-window segments until expiry

RocksDB (default)

Disk-backed LSM tree per task under state.dir. Survives JVM restarts; bounded by disk not heap. Tuned for write-heavy changelog application. Default for Materialized.as().

In-memory store

Stores.inMemoryKeyValueStore — faster, no disk persistence between restarts until restored from changelog. Use for small, rebuildable state or testing.

Changelog topics

Every persistent store gets {application.id}-{store-name}-changelog. Each state update produces a changelog record; compaction retains latest per key. On task migration, new host replays changelog to rebuild RocksDB.

sequenceDiagram
  participant T as Stream Task
  participant R as RocksDB
  participant CL as Changelog Topic
  participant B as Broker
  T->>R: write key=K value=V
  T->>CL: replicate update
  Note over T: Task moves to new instance
  T->>CL: restore from offset
  T->>R: rebuild state

Standby replicas

num.standby.replicas=1 — standby tasks on other instances consume changelog passively. On failover, promoted standby has warm state—faster recovery than cold replay. Cost: extra CPU/network per replica.

Interactive queries

Query local ReadOnlyKeyValueStore via streams.store(StoreQueryParameters). Cross-instance: KafkaStreams.metadataForAllStreamsHosts() + RPC to peer’s hostInfo and storeName. Build a REST facade (Spring) for “get current count for user X” without a separate database.

Java — interactive query
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType("order-counts",
        QueryableStoreTypes.keyValueStore()));

Long count = store.get("customer-42");

// Metadata for routing to correct instance
StreamsMetadata meta = streams.metadataForKey("order-counts",
    "customer-42", Serdes.String().serializer());
💡 Pro Tip

Size state.dir disk for changelog replay headroom—RocksDB compaction plus restore spikes I/O during redeploys.

Windowing deep dive

Window choice determines whether your dashboard updates every hour on the hour, every minute with overlap, or only when a user session ends.

Window type Size / advance Example
Tumbling Fixed size, advance = size (no overlap) Hourly revenue totals
Hopping Fixed size, advance < size (overlap) 5-min average computed every 1 min
Sliding Event-time, per-key record triggers Last-30-seconds activity per click
Session Gap-based inactivityGap User session until 30 min idle
Java — window types
// Tumbling — 1 hour, 10 min grace for late events
TimeWindows tumbling = TimeWindows
    .ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(10));

// Hopping — 5 min window, advance every 1 min
TimeWindows hopping = TimeWindows
    .ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
    .advanceBy(Duration.ofMinutes(1));

// Session — close after 30 min inactivity
SessionWindows session = SessionWindows
    .ofInactivityGapAndGrace(Duration.ofMinutes(30), Duration.ofMinutes(5));

clicks
    .groupByKey()
    .windowedBy(tumbling)
    .count(Materialized.as("hourly-clicks"))
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to("hourly-click-counts");

Grace period

WindowedSerdes / ofSizeAndGrace: after window end time, grace window accepts late records into the same window bucket. Without grace, late events are dropped immediately at window close.

Suppress — emit only final results

suppress(Suppressed.untilWindowCloses(...)) buffers updates until window closes (including grace)—downstream receives one final count per window, not every intermediate update. Reduces output topic volume for dashboards.

⚖️ Trade-off

unbounded() suppress buffer holds all keys until window close—memory risk on high cardinality. Use maxBytes buffer or emit incremental updates without suppress.

Kafka Streams configuration

application.id is the identity of your app in the cluster—it names consumer groups, internal topics, and EOS transactional ids. Changing it creates a new application, not a new version.

Property Default Production Impact
application.id required Stable per logical app (e.g. order-aggregator) Consumer group + changelog/repartition topic prefix
processing.guarantee at_least_once exactly_once_v2 when needed EOS transactions + read_committed internals
num.stream.threads 1 ≤ partitions on input topic Parallel tasks per JVM instance
cache.max.bytes.buffering 10485760 (10 MB) 10–100 MB Record cache dedupes agg updates before downstream
commit.interval.ms 30000 30000–60000 Checkpoint state + commit offsets frequency
replication.factor -1 (broker default) 3 Changelog + repartition internal topics
state.dir /tmp/kafka-streams Persistent volume per pod RocksDB local storage path
num.standby.replicas 0 1 for critical apps Warm standby for failover
Properties — production Streams app
application.id=payment-aggregator
processing.guarantee=exactly_once_v2
num.stream.threads=4
replication.factor=3
commit.interval.ms=30000
cache.max.bytes.buffering=52428800
state.dir=/var/kafka-streams/state
num.standby.replicas=1
acceptable.recovery.lag=10000
application.yml — Spring Kafka Streams
spring.kafka.streams:
  application-id: payment-aggregator
  properties:
    processing.guarantee: exactly_once_v2
    num.stream.threads: 4
    replication.factor: 3
    commit.interval.ms: 30000
⚙️ Config

Never change application.id for a “version upgrade”—you spawn a duplicate consumer group consuming from scratch. Use rolling deploy of same id; change id only for genuinely new app lineage.

Error handling in Streams

Bad bytes, poison messages, and uncaught exceptions in processors each have dedicated hooks— default fail-fast behavior will take down your app unless you configure recovery.

DeserializationExceptionHandler

When input bytes cannot deserialize:

  • LogAndFailExceptionHandler — default; throws, thread dies
  • LogAndContinueExceptionHandler — skip record, continue (at-least-once gap)
  • Custom — route raw bytes to DLT topic via DeserializationExceptionHandler interface

ProductionExceptionHandler

When produce to sink/changelog fails:

  • DefaultProductionExceptionHandler — fail
  • LogAndContinueProductionExceptionHandler — skip failed produce

StreamsUncaughtExceptionHandler

Any uncaught exception in a stream thread:

Action Behavior
REPLACE_THREAD Replace failed thread—state restored from changelog (default in 2.8+)
SHUTDOWN_CLIENT Stop this KafkaStreams instance
SHUTDOWN_APPLICATION Exit JVM via System.exit
Java — error handler config
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
    DefaultProductionExceptionHandler.class);

streams.setUncaughtExceptionHandler(exception -> {
  log.error("Stream thread failed", exception);
  return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

Dead letter topic pattern

Branch poison records: orders.branch((k,v) -> isValid(v), (k,v) -> !isValid(v)) — invalid branch to orders-dlt with original headers + error reason. Prefer explicit validation over silent LogAndContinue for auditability.

⚠️ Pitfall

LogAndContinue on deserialization silently drops records—lag looks healthy while data is missing. Monitor dropped-record metrics and alert on DLT volume.

Scaling Kafka Streams

Horizontal scale = more JVM instances with the same application.id. The Streams partition assignor maps tasks to threads and instances—bounded by input partition count.

Deploy multiple instances

Kubernetes: Deployment with N replicas, identical config, shared application.id. On startup, instances join consumer group backed by application id; tasks distributed across all threads on all pods.

flowchart TB
  subgraph pod1["Pod 1 — 2 threads"]
    T1[Task 0 P0]
    T2[Task 1 P1]
  end
  subgraph pod2["Pod 2 — 2 threads"]
    T3[Task 2 P2]
    T4[Task 3 P3]
  end
  TOPIC[Input topic 4 partitions]
  TOPIC --> T1
  TOPIC --> T2
  TOPIC --> T3
  TOPIC --> T4

StreamsPartitionAssignor

Custom assignor (default StreamsPartitionAssignor) assigns tasks not just partitions—each task is a unit of work for one sub-topology slice. Sticky assignment minimizes state migration on scale-out.

Task model

  • One stream task per input partition (per source sub-topology)
  • Parallelism ceiling = max partitions across source topics (for that topology)
  • num.stream.threads × instances ≤ useful only while total tasks ≥ threads

Sub-topology optimization

Topology splits into sub-topologies connected by repartition edges. Tasks within a sub-topology that do not require repartition may be merged—fewer state stores, less overhead. Inspect with topology.describe().subtopologies().

📦 Real World

LinkedIn runs thousands of Kafka Streams applications—scale by partition planning upfront and num.standby.replicas for sub-minute recovery. Confluent ksqlDB is Kafka Streams under a SQL layer—same task/partition limits apply.

💡 Pro Tip

Before increasing pods, increase source topic partitions and allow rebalance—otherwise new instances only host standby replicas consuming changelog without reducing processing load.