Kafka Streams & Stream Processing
Kafka Streams is a library—not a separate cluster—that turns your JVM app into a stream processor. You define a topology of operators over KStream and KTable abstractions; the runtime handles partitioning, state on disk, changelog recovery, and EOS transactions. This chapter goes from time semantics to scaling tasks across pods.
Stream processing fundamentals
Before operators and joins, time semantics decide whether your hourly count includes late-arriving mobile events or double-counts replays. Kafka Streams defaults to event time when timestamps exist in records.
Event time vs processing time vs ingestion time
| Time type | Definition | Use when |
|---|---|---|
| Event time | When the event occurred in the real world (CREATE_TIME in record) |
Aggregations, windowing—correct despite network delay |
| Processing time | When the stream processor handles the record | Low-latency approximations; non-critical metrics |
| Ingestion time | When broker appended record (LOG_APPEND_TIME) |
When producer clocks untrusted; broker becomes source of truth |
Kafka Streams windowing uses event time from record timestamps by default (TimestampExtractor customizable). Late records outside grace are dropped for closed windows.
Windowing overview
- Tumbling — fixed, non-overlapping buckets (e.g. counts per hour)
- Hopping — fixed size, overlapping advance interval (5-min window every 1 min)
- Sliding — event-time driven; window moves with each key’s activity
- Session — gap-based; closes after inactivity period
Watermarks and late data
Kafka Streams does not use Flink-style watermarks explicitly—it uses grace period on windows: after window end, accept late events for grace duration, then close permanently. Records after grace are dropped (or sent to side output if you branch).
Event time: |---- Window [10:00 - 11:00) ----|.... grace 5m ....| CLOSED Late event at 11:03 → still counted (within grace) Late event at 11:07 → dropped
Stateless vs stateful operations
| Type | Examples | State store? |
|---|---|---|
| Stateless | filter, map, flatMap, branch |
No |
| Stateful | aggregate, count, reduce, join, groupBy |
Yes — RocksDB + changelog |
“Why event time?” — processing time breaks when consumers lag or replay history; event time keeps window boundaries stable for billing and analytics. Mention grace for straggler events.
Kafka Streams architecture
Your application is a directed graph of processors. Sources read Kafka topics; stream processors transform; sinks write output topics. The runtime compiles DSL into this graph and assigns tasks to threads.
flowchart LR SRC[SourceProcessor\nread topic] SP[StreamProcessor\nmap/filter] SS[StateStore\nRocksDB] SNK[SinkProcessor\nwrite topic] SRC --> SP --> SNK SP <--> SS
KStream — event stream
Unbounded sequence where every record is an event—inserts, updates, deletes all appear as distinct facts. A price change produces a new event; prior event remains in the log (unless compacted away). Use for: raw clicks, orders, sensor readings, CDC events.
KTable — changelog table
Represents current state—latest value per key. Under the hood, consumes a topic (often compacted) and materializes key → value. Null value = tombstone = delete key from table. Use for: dimension tables, user profiles, inventory snapshots.
GlobalKTable — replicated lookup
Entire table replicated on every stream instance—no partition alignment required for joins. Loaded from a compacted topic with full consume on startup. Memory/disk cost scales with table size × instances. Use for: small reference data (country codes, product catalog < few GB).
Stream-table duality
Same compacted topic can be viewed as KStream (every changelog record as event) or KTable (collapsed to latest per key). Aggregations emit KTable; joining stream with table enriches events with dimension data without a database round-trip.
Compacted topic user-profiles:
key=user1, value={tier: GOLD} → KStream: event | KTable: user1=GOLD
key=user1, value={tier: PLAT} → KStream: event | KTable: user1=PLAT (replaces)
key=user1, value=null → tombstone | KTable: user1 deleted
KTable is backed by a changelog state store—every update writes to local RocksDB and a internal changelog topic for recovery. On restart, restore from changelog + consume input topic from checkpoint offset.
GlobalKTable simplifies joins but does not scale to huge tables—every pod holds a full copy. Large dimensions: use partitioned KTable + co-partitioned join instead.
Topology building
The StreamsBuilder DSL compiles to a topology of nodes and edges. Repartition topics appear automatically when you groupBy on a key different from the source partition key.
StreamsBuilder builder = new StreamsBuilder();
// Sources
KStream<String, OrderEvent> orders = builder.stream("orders",
Consumed.with(Serdes.String(), orderSerde));
KTable<String, Customer> customers = builder.table("customers",
Consumed.with(Serdes.String(), customerSerde));
// Stateless
KStream<String, OrderEvent> highValue = orders
.filter((k, v) -> v.amount() > 1000)
.mapValues(v -> v.withValidated(true))
.flatMapValues(v -> v.lineItems()); // 1 order → N line items
// Stateful aggregation (tumbling window)
orders
.groupByKey(Grouped.with(Serdes.String(), orderSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
OrderStats::empty,
(key, order, agg) -> agg.add(order),
Materialized.<String, OrderStats, WindowStore<Bytes, byte[]>>as("order-stats-store")
.withValueSerde(orderStatsSerde))
.toStream()
.map((windowedKey, stats) -> KeyValue.pair(windowedKey.key(), stats))
.to("order-stats-5min");
// KStream-KTable join (enrichment, no window)
KStream<String, EnrichedOrder> enriched = orders.join(
customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderSerde, customerSerde));
// KStream-KStream windowed join
KStream<String, PaymentEvent> payments = builder.stream("payments");
orders.join(
payments,
(order, payment) -> new MatchedPayment(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde));
KTable<String, Long> orderCounts = orders
.groupByKey()
.count(Materialized.as("order-counts"));
// Sink
enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
DSL operators reference
| Operator | Input → Output | Notes |
|---|---|---|
filter / filterNot | KStream → KStream | Stateless drop |
map / mapValues | KStream → KStream | Transform; map may change key → repartition |
flatMap / flatMapValues | KStream → KStream | 1→N records |
groupBy / groupByKey | KStream → KGroupedStream | Triggers repartition if key changed |
windowedBy | KGroupedStream → KGroupedWindowedStream | Required before windowed agg/join |
aggregate / reduce / count | Grouped → KTable | Stateful; creates store + changelog |
join | Stream-Table, Stream-Stream, Table-Table | Co-partitioning rules apply |
Join types
| Join | Window required? | Partition requirement |
|---|---|---|
| KStream-KStream | Yes — JoinWindows |
Same key, co-partitioned topics |
| KStream-KTable | No | Same key, co-partitioned |
| KTable-KTable | No | Co-partitioned |
| KStream-GlobalKTable | No | No co-partitioning—table on all nodes |
Processor API and DSL bridge
Low-level Processor API: implement Processor / Context, schedule punctuators (wall-clock or stream-time callbacks), access state stores directly. Bridge from DSL: transform(), process(), transformValues() for custom logic that does not fit map/filter.
orders.transformValues(
() -> new ValueTransformerWithKey<String, OrderEvent, OrderEvent>() {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
context.schedule(Duration.ofMinutes(1), PunctuationType.STREAM_TIME,
ts -> log.info("Stream time advanced to {}", ts));
}
@Override
public OrderEvent transform(String key, OrderEvent value) {
return value.withProcessedAt(context.timestamp());
}
@Override public void close() {}
},
"transform-store-names"
);
Implicit repartition topics appear on groupBy key changes—internal topics multiply. Inspect topology with topology.describe() before production; set replication.factor on internal topics.
State stores
Aggregations and joins need memory or disk to remember prior keys. Kafka Streams persists state locally and mirrors changes to changelog topics so tasks can move between instances without losing counts.
Why local state
- Aggregations — running sum/count per key
- Joins — KTable materialization holds probe side
- Deduplication — custom processor tracks seen IDs in store
- Window stores — retain per-window segments until expiry
RocksDB (default)
Disk-backed LSM tree per task under state.dir. Survives JVM restarts; bounded by disk not heap. Tuned for write-heavy changelog application. Default for Materialized.as().
In-memory store
Stores.inMemoryKeyValueStore — faster, no disk persistence between restarts until restored from changelog. Use for small, rebuildable state or testing.
Changelog topics
Every persistent store gets {application.id}-{store-name}-changelog. Each state update produces a changelog record; compaction retains latest per key. On task migration, new host replays changelog to rebuild RocksDB.
sequenceDiagram participant T as Stream Task participant R as RocksDB participant CL as Changelog Topic participant B as Broker T->>R: write key=K value=V T->>CL: replicate update Note over T: Task moves to new instance T->>CL: restore from offset T->>R: rebuild state
Standby replicas
num.standby.replicas=1 — standby tasks on other instances consume changelog passively. On failover, promoted standby has warm state—faster recovery than cold replay. Cost: extra CPU/network per replica.
Interactive queries
Query local ReadOnlyKeyValueStore via streams.store(StoreQueryParameters). Cross-instance: KafkaStreams.metadataForAllStreamsHosts() + RPC to peer’s hostInfo and storeName. Build a REST facade (Spring) for “get current count for user X” without a separate database.
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType("order-counts",
QueryableStoreTypes.keyValueStore()));
Long count = store.get("customer-42");
// Metadata for routing to correct instance
StreamsMetadata meta = streams.metadataForKey("order-counts",
"customer-42", Serdes.String().serializer());
Size state.dir disk for changelog replay headroom—RocksDB compaction plus restore spikes I/O during redeploys.
Windowing deep dive
Window choice determines whether your dashboard updates every hour on the hour, every minute with overlap, or only when a user session ends.
| Window type | Size / advance | Example |
|---|---|---|
| Tumbling | Fixed size, advance = size (no overlap) | Hourly revenue totals |
| Hopping | Fixed size, advance < size (overlap) | 5-min average computed every 1 min |
| Sliding | Event-time, per-key record triggers | Last-30-seconds activity per click |
| Session | Gap-based inactivityGap |
User session until 30 min idle |
// Tumbling — 1 hour, 10 min grace for late events
TimeWindows tumbling = TimeWindows
.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(10));
// Hopping — 5 min window, advance every 1 min
TimeWindows hopping = TimeWindows
.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1));
// Session — close after 30 min inactivity
SessionWindows session = SessionWindows
.ofInactivityGapAndGrace(Duration.ofMinutes(30), Duration.ofMinutes(5));
clicks
.groupByKey()
.windowedBy(tumbling)
.count(Materialized.as("hourly-clicks"))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("hourly-click-counts");
Grace period
WindowedSerdes / ofSizeAndGrace: after window end time, grace window accepts late records into the same window bucket. Without grace, late events are dropped immediately at window close.
Suppress — emit only final results
suppress(Suppressed.untilWindowCloses(...)) buffers updates until window closes (including grace)—downstream receives one final count per window, not every intermediate update. Reduces output topic volume for dashboards.
unbounded() suppress buffer holds all keys until window close—memory risk on high cardinality. Use maxBytes buffer or emit incremental updates without suppress.
Kafka Streams configuration
application.id is the identity of your app in the cluster—it names consumer groups, internal topics, and EOS transactional ids. Changing it creates a new application, not a new version.
| Property | Default | Production | Impact |
|---|---|---|---|
application.id |
required | Stable per logical app (e.g. order-aggregator) |
Consumer group + changelog/repartition topic prefix |
processing.guarantee |
at_least_once | exactly_once_v2 when needed | EOS transactions + read_committed internals |
num.stream.threads |
1 | ≤ partitions on input topic | Parallel tasks per JVM instance |
cache.max.bytes.buffering |
10485760 (10 MB) | 10–100 MB | Record cache dedupes agg updates before downstream |
commit.interval.ms |
30000 | 30000–60000 | Checkpoint state + commit offsets frequency |
replication.factor |
-1 (broker default) | 3 | Changelog + repartition internal topics |
state.dir |
/tmp/kafka-streams | Persistent volume per pod | RocksDB local storage path |
num.standby.replicas |
0 | 1 for critical apps | Warm standby for failover |
application.id=payment-aggregator
processing.guarantee=exactly_once_v2
num.stream.threads=4
replication.factor=3
commit.interval.ms=30000
cache.max.bytes.buffering=52428800
state.dir=/var/kafka-streams/state
num.standby.replicas=1
acceptable.recovery.lag=10000
spring.kafka.streams:
application-id: payment-aggregator
properties:
processing.guarantee: exactly_once_v2
num.stream.threads: 4
replication.factor: 3
commit.interval.ms: 30000
Never change application.id for a “version upgrade”—you spawn a duplicate consumer group consuming from scratch. Use rolling deploy of same id; change id only for genuinely new app lineage.
Error handling in Streams
Bad bytes, poison messages, and uncaught exceptions in processors each have dedicated hooks— default fail-fast behavior will take down your app unless you configure recovery.
DeserializationExceptionHandler
When input bytes cannot deserialize:
- LogAndFailExceptionHandler — default; throws, thread dies
- LogAndContinueExceptionHandler — skip record, continue (at-least-once gap)
- Custom — route raw bytes to DLT topic via DeserializationExceptionHandler interface
ProductionExceptionHandler
When produce to sink/changelog fails:
- DefaultProductionExceptionHandler — fail
- LogAndContinueProductionExceptionHandler — skip failed produce
StreamsUncaughtExceptionHandler
Any uncaught exception in a stream thread:
| Action | Behavior |
|---|---|
REPLACE_THREAD |
Replace failed thread—state restored from changelog (default in 2.8+) |
SHUTDOWN_CLIENT |
Stop this KafkaStreams instance |
SHUTDOWN_APPLICATION |
Exit JVM via System.exit |
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
DefaultProductionExceptionHandler.class);
streams.setUncaughtExceptionHandler(exception -> {
log.error("Stream thread failed", exception);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
Dead letter topic pattern
Branch poison records: orders.branch((k,v) -> isValid(v), (k,v) -> !isValid(v)) — invalid branch to orders-dlt with original headers + error reason. Prefer explicit validation over silent LogAndContinue for auditability.
LogAndContinue on deserialization silently drops records—lag looks healthy while data is missing. Monitor dropped-record metrics and alert on DLT volume.
Scaling Kafka Streams
Horizontal scale = more JVM instances with the same application.id. The Streams partition assignor maps tasks to threads and instances—bounded by input partition count.
Deploy multiple instances
Kubernetes: Deployment with N replicas, identical config, shared application.id. On startup, instances join consumer group backed by application id; tasks distributed across all threads on all pods.
flowchart TB
subgraph pod1["Pod 1 — 2 threads"]
T1[Task 0 P0]
T2[Task 1 P1]
end
subgraph pod2["Pod 2 — 2 threads"]
T3[Task 2 P2]
T4[Task 3 P3]
end
TOPIC[Input topic 4 partitions]
TOPIC --> T1
TOPIC --> T2
TOPIC --> T3
TOPIC --> T4
StreamsPartitionAssignor
Custom assignor (default StreamsPartitionAssignor) assigns tasks not just partitions—each task is a unit of work for one sub-topology slice. Sticky assignment minimizes state migration on scale-out.
Task model
- One stream task per input partition (per source sub-topology)
- Parallelism ceiling = max partitions across source topics (for that topology)
- num.stream.threads × instances ≤ useful only while total tasks ≥ threads
Sub-topology optimization
Topology splits into sub-topologies connected by repartition edges. Tasks within a sub-topology that do not require repartition may be merged—fewer state stores, less overhead. Inspect with topology.describe().subtopologies().
Max useful instances ≈ total_input_partitions / num.stream.threads
Example: 12 partitions, 4 threads/pod → max ~3 pods fully utilized
4th pod → standby tasks only (or idle)
LinkedIn runs thousands of Kafka Streams applications—scale by partition planning upfront and num.standby.replicas for sub-minute recovery. Confluent ksqlDB is Kafka Streams under a SQL layer—same task/partition limits apply.
Before increasing pods, increase source topic partitions and allow rebalance—otherwise new instances only host standby replicas consuming changelog without reducing processing load.