Consumer Internals & Consumer Groups
Consumer groups are the most misunderstood part of Kafka. The bug is rarely “Kafka lost my message”—it is poll() not called often enough, offsets committed before processing, or a rebalance storm during a K8s rollout. This chapter explains what the consumer client actually does on every loop iteration and how the group coordinator assigns partitions.
Consumer architecture
The Kafka consumer is an event loop built around poll(). Your application thread drives network I/O, group membership, and offset commits—not a background daemon you can ignore.
poll() loop: the fundamental pattern
Every consumer application follows the same skeleton: subscribe, then loop forever calling consumer.poll(Duration), process returned records, optionally commit offsets, repeat. If you stop polling, you stop fetching new data and you stop telling the broker you are alive.
consumer.subscribe(List.of("orders"));
try {
while (running) {
ConsumerRecords<String, OrderEvent> records =
consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, OrderEvent> record : records) {
process(record); // keep this fast — poll again before max.poll.interval.ms
}
}
} finally {
consumer.close();
}
flowchart TD
POLL[poll called]
HB[Send heartbeat to coordinator]
RB{Rebalance pending?}
FETCH[Send Fetch requests to leaders]
DEC[Deserialize batches]
RET[Return ConsumerRecords]
POLL --> HB --> RB
RB -->|yes| REVOKE[onPartitionsRevoked / cooperative revoke]
RB -->|no| FETCH
REVOKE --> FETCH
FETCH --> DEC --> RET
Why poll() is not just “get messages”
Each poll() invocation performs critical side work:
- Heartbeat — background heartbeat thread exists (Kafka 0.10.1+), but poll() still advances the consumer’s liveness contract; long gaps between polls trigger rebalance
- Rebalance detection — handles partition revocation and assignment callbacks from the group coordinator
- Offset commit side effects — if enable.auto.commit=true, commits fire on the poll interval schedule during/after poll processing
- Fetch pipeline — sends fetch requests for assigned partitions, decompresses batches, delivers records
Heavy processing inside the poll loop without increasing max.poll.interval.ms causes the coordinator to assume the consumer died—triggering rebalance and duplicate processing as partitions move to another member.
Fetch request tuning
The consumer does not receive records instantly on poll—it issues Fetch requests subject to flow control:
| Property | Role in fetch | Throughput vs latency |
|---|---|---|
fetch.min.bytes |
Broker waits until at least N bytes available per fetch (default 1) | Higher = larger batches, higher latency |
fetch.max.wait.ms |
Max wait if fetch.min.bytes not met (default 500ms) |
Caps how long poll blocks waiting for data |
max.poll.records |
Max records returned per poll (default 500) | Lower = smaller processing bursts per loop |
max.partition.fetch.bytes |
Max bytes per partition per fetch (default 1 MB) | Prevents one huge message from starving others |
poll(timeout) blocks up to the timeout or until records arrive (subject to fetch.max.wait.ms). An empty ConsumerRecords return is normal—keep polling.
ConsumerRecord anatomy
Each record in ConsumerRecords exposes:
| Field | Meaning |
|---|---|
topic() | Source topic name |
partition() | Partition within topic |
offset() | Position in partition log (next commit points to offset+1) |
key() / value() | Deserialized payload |
headers() | Metadata (trace ID, content-type, retry count) |
timestamp() | CREATE_TIME (producer) or LOG_APPEND_TIME (broker) |
timestampType() | Which timestamp semantics apply |
leaderEpoch() | Leader epoch for offset validation after failover |
for (ConsumerRecord<String, OrderEvent> record : records) {
log.info("topic={} part={} offset={} ts={} key={}",
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.key());
Header retry = record.headers().lastHeader("retry-count");
if (retry != null) {
int count = ByteBuffer.wrap(retry.value()).getInt();
}
}
Fetch responses use the same zero-copy path as replication where possible—records arrive as compressed batches. The consumer decompresses in the JVM, deserializes per your configured Deserializer, and builds ConsumerRecord objects for the poll return.
Consumer groups
A consumer group is a set of consumers sharing a group.id that jointly consume one or more topics. Kafka treats them as competing workers on a queue—but with a log underneath, so each group tracks its own progress independently.
Consumer group definition
All consumers with the same group.id coordinate via the group coordinator. Each partition of a subscribed topic is assigned to exactly one consumer in the group at a time. Different groups reading the same topic are fully independent—fan-out pattern.
Topic: orders (3 partitions) ├── Consumer Group "fulfillment" → C1:P0 C2:P1 C3:P2 └── Consumer Group "analytics" → A1:P0,P1,P2 (one consumer, all partitions) Same messages, separate offset progress per group.
Partition assignment
On join/rebalance, the group leader runs a partition assignor plugin (range, roundrobin, cooperative-sticky) and produces a map: TopicPartition → consumer member id. The coordinator distributes this assignment; consumers start fetching only their partitions.
Parallelism ceiling
Maximum useful consumers in a group = total partitions across subscribed topics (per-topic math applies when multiple topics). If consumers > partitions, extras sit idle—they join the group, get zero partitions, still heartbeat.
Before scaling consumers from 3 → 12, verify partition count ≥ 12. Otherwise you add JVM heap and rebalance surface area with zero throughput gain.
Group coordinator
For each group.id, one broker is elected group coordinator—hash of group id mod offsets.topic.num.partitions (default 50) picks which broker owns the internal offsets partition for that group. All join/sync/heartbeat/offset commit traffic goes to this broker.
__consumer_offsets internal topic
Compact topic (default 50 partitions, RF=3) storing committed offsets: key = (group.id, topic, partition), value = offset + optional metadata. Never delete this topic in production—it is Kafka’s source of truth for where each group left off.
sequenceDiagram participant C as Consumer participant CO as Group Coordinator Broker participant OT as __consumer_offsets participant L as Partition Leader C->>CO: JoinGroup / SyncGroup CO-->>C: Assignment: orders-0, orders-1 C->>L: Fetch orders-0 C->>CO: OffsetCommit orders-0 offset=1042 CO->>OT: compacted write
“What happens if two consumers use the same group.id?” — they share partitions; each partition goes to one member. Different group.id = duplicate consumption with independent offsets. “What if no group.id?” — standalone consumer with manual partition assignment only; no automatic balancing.
Offset management
Offsets are bookmarks in the partition log—not message IDs. How and when you commit them defines your delivery semantics more than any broker setting.
What an offset is
Offset n means “I have consumed record at position n; next fetch starts at n+1.” Committed offset is the durable bookmark stored in __consumer_offsets. Log end offset (LEO) is the next offset the producer would write—lag = LEO − committed.
enable.auto.commit (default true)
Automatically commits offsets at auto.commit.interval.ms (default 5000ms) for offsets returned by the last poll—not per-record. Dangerous with slow processing: poll returns batch → auto-commit fires 5s later → records still being processed → crash → offsets already advanced → records skipped (at-most-once behavior).
enable.auto.commit=false
# Required for manual control in production services
Manual commit: commitSync() vs commitAsync()
| Method | Behavior | When to use |
|---|---|---|
commitSync() |
Blocks until broker ack; retries on retriable errors | Shutdown hooks, after critical batch, simplicity |
commitAsync() |
Non-blocking; no automatic retry on failure | Hot path; use callback to log/handle commit failures |
commitSync(Map) |
Commit specific offsets per partition with metadata | Partial batch success, transactional boundaries |
At-least-once: process THEN commit
Correct pattern for manual commit: fully process record(s), then commit offset of last successfully processed record + 1. On crash before commit, records replay—handler must be idempotent.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(partition);
long lastOffset = -1;
for (ConsumerRecord<String, String> record : partRecords) {
process(record); // may throw — offset not committed yet
lastOffset = record.offset();
}
if (lastOffset >= 0) {
consumer.commitSync(Map.of(
partition,
new OffsetAndMetadata(lastOffset + 1)
));
}
}
At-most-once: commit THEN process (almost never right)
Committing before processing means a crash after commit loses records silently. Only acceptable for best-effort metrics where loss is tolerable—never for billing, inventory, or audit.
Seek operations
- consumer.seek(tp, offset) — jump to absolute offset; next poll reads from there
- seekToBeginning(Collection<TopicPartition>) — replay from start of retention
- seekToEnd(Collection<TopicPartition>) — skip to latest (only new records)
- offsetsForTimes(Map<TopicPartition, Long>) — find offset for timestamp per partition (replay from incident time)
TopicPartition tp = new TopicPartition("orders", 0);
long incidentMs = 1717200000000L;
Map<TopicPartition, Long> query = Map.of(tp, incidentMs);
Map<TopicPartition, OffsetAndTimestamp> offsets =
consumer.offsetsForTimes(query);
OffsetAndTimestamp oat = offsets.get(tp);
if (oat != null) {
consumer.seek(tp, oat.offset());
log.info("Rewound {} to offset {} at ts {}", tp, oat.offset(), oat.timestamp());
}
spring.kafka.consumer:
enable-auto-commit: false
auto-offset-reset: earliest
properties:
isolation.level: read_committed
commitSync after every record = safest, slowest (RTT per record). Batch commits per poll partition = higher throughput, larger replay window on failure. Size batches to your idempotency budget.
Rebalancing — the most important topic
Rebalance is a stop-the-world (or incremental) redistribution of partitions across group members. Every production incident involving “duplicate processing” or “lag spike at deploy” traces here.
What triggers rebalance
| Trigger | Mechanism | Mitigation |
|---|---|---|
| Consumer joins group | New member needs partition share | Scale consumers gradually; use cooperative assignor |
| Consumer leaves / crashes | session.timeout.ms exceeded without heartbeat |
Tune timeouts; graceful close() |
max.poll.interval.ms exceeded |
poll() not called in time—slow processing | Increase interval or offload processing to worker pool |
| Partition count change | Topic partitions increased | Expected; ensure assignor handles new partitions |
| Subscription change | subscribe() different topics |
Plan topic changes as deploy events |
| Static member mismatch | group.instance.id conflict |
Unique instance IDs per pod |
Stop-the-world rebalance (eager)
Legacy protocol (pre-cooperative): all consumers revoke all partitions, group re-joins, fresh assignment issued, consumers resume from committed offset. Processing halts cluster-wide for that group during rebalance—seconds to minutes at scale.
Cooperative incremental rebalance (Kafka 2.4+)
CooperativeStickyAssignor: two-phase protocol. Round 1: only partitions that move are revoked; consumers keep processing unmoved partitions. Round 2: revoked partitions reassigned to new owners. Dramatically reduces rebalance pause during rolling deploys.
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
group.instance.id — static membership
Stable instance identifier (e.g. pod name) survives consumer restart within session.timeout.ms. Coordinator delays rebalance on brief restart—critical for Kubernetes rolling updates where pod dies and replacement joins with same ID.
spring.kafka.consumer:
group-id: order-processor
properties:
group.instance.id: ${HOSTNAME}
partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
session.timeout.ms: 45000
heartbeat.interval.ms: 15000
ConsumerRebalanceListener
Hook into partition lifecycle—mandatory for correct offset handling on revoke:
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// CRITICAL: commit offsets for partitions being taken away
try {
consumer.commitSync();
} catch (Exception e) {
log.error("Commit on revoke failed", e);
}
// Close any partition-scoped resources (files, locks)
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Optional: seek to committed offset or custom position
// consumer.seek(...) if not using auto reset
log.info("Assigned partitions: {}", partitions);
}
});
Rebalancing storms — autoscaler adds/removes consumers rapidly, or health checks kill pods before close(), causing continuous rebalance. Lag spikes, duplicate processing, and max.poll.interval violations compound. Stabilize replica count; use static membership + cooperative assignor.
LinkedIn pioneered cooperative rebalancing to cut deploy-time consume pause. Uber mandates group.instance.id on stateful stream consumers in Kubernetes to survive pod restarts without full group rebalance.
Consumer configuration deep dive
Consumer tuning is balancing liveness (heartbeats, poll frequency), throughput (fetch batching), and correctness (offsets, isolation).
| Property | Default | Production | Impact |
|---|---|---|---|
group.id |
— (required) | Stable per app role | Offset namespace; changing it reprocesses from auto.offset.reset |
max.poll.interval.ms |
300000 (5 min) | 300000–900000 for slow handlers | Max gap between poll() calls before declared dead |
session.timeout.ms |
45000 (Kafka 3.x) | 45000–60000 | Heartbeat failure → rebalance |
heartbeat.interval.ms |
3000 | session.timeout / 3 | Must be < session.timeout / 3 |
fetch.min.bytes |
1 | 1 (latency) or 100000 (throughput) | Broker batches fetch response size |
fetch.max.wait.ms |
500 | 500 (latency) or 500+ (throughput) | Max block waiting for min bytes |
max.partition.fetch.bytes |
1048576 (1 MB) | 1–4 MB | Per-partition fetch ceiling |
max.poll.records |
500 | 100–1000 | Records per poll—bounds processing burst |
auto.offset.reset |
latest | earliest (backfill) / latest (live only) | Behavior when no committed offset exists |
isolation.level |
read_uncommitted | read_committed (EOS) | Filter aborted transactional records |
enable.auto.commit |
true | false for services | Automatic offset commits on interval |
auto.commit.interval.ms |
5000 | N/A if auto commit disabled | Commit frequency for auto mode |
max.poll.interval.ms — the slow handler knob
If processing 500 records takes 6 minutes and you never call poll() in between, the consumer is kicked from the group—even though heartbeat thread may still run in modern clients, the poll contract is separate. Fix: increase max.poll.interval.ms, reduce max.poll.records, or delegate processing to workers and poll frequently.
auto.offset.reset semantics
- earliest — start at beginning of retention when no offset exists
- latest — skip to end; only new records (default—surprises teams on first deploy)
- none — throw exception if no offset—forces explicit positioning
spring.kafka.consumer:
group-id: payment-processor
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 250
properties:
max.poll.interval.ms: 600000
session.timeout.ms: 45000
heartbeat.interval.ms: 15000
partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
group.instance.id: ${HOSTNAME}
isolation.level: read_committed
fetch.max.wait.ms: 500
fetch.min.bytes: 1
Heartbeat rule: heartbeat.interval.ms ≤ session.timeout.ms / 3. Poll rule: time to process max.poll.records << max.poll.interval.ms.
Consumer lag
Lag is the distance between what producers wrote and what your group has durably committed as processed. It is the single most important consumer health metric—more actionable than CPU or heap.
Definition
Per partition: lag = log end offset − committed offset (sometimes measured against consumer position instead of committed—tools vary; align with your dashboard). Sum across partitions for group lag. Lag of 0 means fully caught up on that partition.
Partition log: [0][1][2][3][4][5][6][7] LEO = 8
Committed offset: ^
5
Consumer lag = 8 - 5 = 3 records behind
Why lag matters most
- Growing lag → consumer cannot keep pace → SLA breach for real-time pipelines
- Per-partition lag skew → hot partition or slow consumer instance
- Lag spike → often rebalance or deploy, not producer surge
- Stuck lag on one partition → poison message or single-thread bottleneck
Monitoring tools
| Tool | What it shows |
|---|---|
kafka-consumer-groups.sh --describe |
Per-partition CURRENT-OFFSET, LOG-END-OFFSET, LAG, consumer id |
| Burrow (LinkedIn) | Lag evaluation with status OK/WARN/ERR; SLA-oriented alerting |
| Cruise Control | Cluster metrics + lag-aware partition rebalancing |
| Prometheus + Grafana | kafka_consumer_fetch_manager_records_lag_max JMX → dashboard |
| Confluent Control Center / Cloud | UI lag charts per group |
kafka-consumer-groups.sh \
--bootstrap-server broker:9092 \
--group payment-processor \
--describe
# Output columns: TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
Lag growing steadily
Producer rate > consumer processing rate. Remedies: add consumers (if partitions allow), optimize handler, increase parallelism via partition count (long-term), fix downstream DB/API latency, scale processing tier.
Lag jump (sudden spike)
- Rebalance — partitions revoked mid-batch; temporary duplication risk; lag redistributes
- Consumer restart — catch-up from committed offset while producers continued
- GC pause / JVM STW — missed polls; may trigger rebalance cascade
- Broker leader election — brief fetch delay, not true lag—distinguish via metrics
Alert on records-lag-max and rate of change, not absolute lag alone—a steady 10k lag may be fine if bounded; 10k → 50k in 5 minutes is not.
Interactive consumer lag simulator
Adjust produce and consume rates to see lag accumulate or drain in real time. Hit “Simulate rebalance” to inject a lag spike—what you see after rolling deploys.
lag += (produce_rate − consume_rate) per tick · rebalance adds one-time spike (duplicate processing + catch-up pause)
Real lag uses broker log end offset vs committed offset in __consumer_offsets. This simulator models the same math: when produce > consume, lag grows linearly; when consume > produce, lag drains until zero.
Multi-threading with consumers
KafkaConsumer is not thread-safe. Only one thread may call poll(), commit, seek, or pause at a time. Violating this causes undefined behavior and cryptic coordinator errors.
Pattern 1: one consumer per thread
Simplest and most common: N threads, N KafkaConsumer instances, same group.id. Group coordinator assigns partitions across them automatically. Scale N ≤ total partitions.
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
Properties props = baseConsumerProps();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
executor.submit(() -> {
try {
while (running) {
for (ConsumerRecord<String, String> r : consumer.poll(Duration.ofMillis(500))) {
process(r);
}
}
} finally {
consumer.close();
}
});
}
Pattern 2: consumer thread + worker pool
One dedicated thread polls; hands records to a fixed thread pool for processing. Risk: workers fall behind → must pause() partitions with deep queues and resume() when caught up—otherwise unbounded memory and max.poll.interval violations if poll thread blocks on queue.
BlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>(1000);
Set<TopicPartition> paused = new HashSet<>();
// Worker threads drain queue and process
// Poll thread:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> r : records) {
if (!queue.offer(r)) {
TopicPartition tp = new TopicPartition(r.topic(), r.partition());
if (paused.add(tp)) {
consumer.pause(Set.of(tp));
}
break;
}
}
// When queue drains: consumer.resume(paused); paused.clear();
Pattern 3: virtual threads (Java 21+)
Spawn one virtual thread per record (or per small batch) inside the poll loop—cheap concurrency for I/O-bound handlers. Poll thread must still call poll() frequently; use ExecutorService.newVirtualThreadPerTaskExecutor() and either small max.poll.records or periodic poll in a separate scheduler thread (advanced).
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
while (running) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> process(record));
}
// Keep max.poll.records low; monitor max.poll.interval
}
}
| Pattern | Pros | Cons |
|---|---|---|
| One consumer / thread | Simple, safe, scales with partitions | N JVM consumers = N connections |
| Poll + worker pool | Parallel processing within partition* | *Breaks per-partition order; pause/resume complexity |
| Virtual threads | High concurrency for I/O-bound work | Easy to violate poll interval if misconfigured |
Sharing one KafkaConsumer across @Async Spring beans without synchronization corrupts internal state. Use ConcurrentKafkaListenerContainerFactory with concurrency instead—it manages one consumer per thread correctly.
“How do you parallelize consumption?” — first answer: more partitions + more consumers in the group (preserves order per key). Parallel processing within a partition sacrifices order—only if business allows. Mention pause/resume for backpressure.