Consumer Internals & Consumer Groups

Consumer groups are the most misunderstood part of Kafka. The bug is rarely “Kafka lost my message”—it is poll() not called often enough, offsets committed before processing, or a rebalance storm during a K8s rollout. This chapter explains what the consumer client actually does on every loop iteration and how the group coordinator assigns partitions.

developer senior platform Kafka 3.x

Consumer architecture

The Kafka consumer is an event loop built around poll(). Your application thread drives network I/O, group membership, and offset commits—not a background daemon you can ignore.

poll() loop: the fundamental pattern

Every consumer application follows the same skeleton: subscribe, then loop forever calling consumer.poll(Duration), process returned records, optionally commit offsets, repeat. If you stop polling, you stop fetching new data and you stop telling the broker you are alive.

Java — canonical poll loop
consumer.subscribe(List.of("orders"));

try {
  while (running) {
    ConsumerRecords<String, OrderEvent> records =
        consumer.poll(Duration.ofMillis(500));

    for (ConsumerRecord<String, OrderEvent> record : records) {
      process(record);  // keep this fast — poll again before max.poll.interval.ms
    }
  }
} finally {
  consumer.close();
}
flowchart TD
  POLL[poll called]
  HB[Send heartbeat to coordinator]
  RB{Rebalance pending?}
  FETCH[Send Fetch requests to leaders]
  DEC[Deserialize batches]
  RET[Return ConsumerRecords]
  POLL --> HB --> RB
  RB -->|yes| REVOKE[onPartitionsRevoked / cooperative revoke]
  RB -->|no| FETCH
  REVOKE --> FETCH
  FETCH --> DEC --> RET

Why poll() is not just “get messages”

Each poll() invocation performs critical side work:

  • Heartbeat — background heartbeat thread exists (Kafka 0.10.1+), but poll() still advances the consumer’s liveness contract; long gaps between polls trigger rebalance
  • Rebalance detection — handles partition revocation and assignment callbacks from the group coordinator
  • Offset commit side effects — if enable.auto.commit=true, commits fire on the poll interval schedule during/after poll processing
  • Fetch pipeline — sends fetch requests for assigned partitions, decompresses batches, delivers records
⚠️ Pitfall

Heavy processing inside the poll loop without increasing max.poll.interval.ms causes the coordinator to assume the consumer died—triggering rebalance and duplicate processing as partitions move to another member.

Fetch request tuning

The consumer does not receive records instantly on poll—it issues Fetch requests subject to flow control:

Property Role in fetch Throughput vs latency
fetch.min.bytes Broker waits until at least N bytes available per fetch (default 1) Higher = larger batches, higher latency
fetch.max.wait.ms Max wait if fetch.min.bytes not met (default 500ms) Caps how long poll blocks waiting for data
max.poll.records Max records returned per poll (default 500) Lower = smaller processing bursts per loop
max.partition.fetch.bytes Max bytes per partition per fetch (default 1 MB) Prevents one huge message from starving others

poll(timeout) blocks up to the timeout or until records arrive (subject to fetch.max.wait.ms). An empty ConsumerRecords return is normal—keep polling.

ConsumerRecord anatomy

Each record in ConsumerRecords exposes:

Field Meaning
topic()Source topic name
partition()Partition within topic
offset()Position in partition log (next commit points to offset+1)
key() / value()Deserialized payload
headers()Metadata (trace ID, content-type, retry count)
timestamp()CREATE_TIME (producer) or LOG_APPEND_TIME (broker)
timestampType()Which timestamp semantics apply
leaderEpoch()Leader epoch for offset validation after failover
Java — record metadata usage
for (ConsumerRecord<String, OrderEvent> record : records) {
  log.info("topic={} part={} offset={} ts={} key={}",
      record.topic(),
      record.partition(),
      record.offset(),
      record.timestamp(),
      record.key());

  Header retry = record.headers().lastHeader("retry-count");
  if (retry != null) {
    int count = ByteBuffer.wrap(retry.value()).getInt();
  }
}
🔬 Under the Hood

Fetch responses use the same zero-copy path as replication where possible—records arrive as compressed batches. The consumer decompresses in the JVM, deserializes per your configured Deserializer, and builds ConsumerRecord objects for the poll return.

Consumer groups

A consumer group is a set of consumers sharing a group.id that jointly consume one or more topics. Kafka treats them as competing workers on a queue—but with a log underneath, so each group tracks its own progress independently.

Consumer group definition

All consumers with the same group.id coordinate via the group coordinator. Each partition of a subscribed topic is assigned to exactly one consumer in the group at a time. Different groups reading the same topic are fully independent—fan-out pattern.

Partition assignment

On join/rebalance, the group leader runs a partition assignor plugin (range, roundrobin, cooperative-sticky) and produces a map: TopicPartition → consumer member id. The coordinator distributes this assignment; consumers start fetching only their partitions.

Parallelism ceiling

Maximum useful consumers in a group = total partitions across subscribed topics (per-topic math applies when multiple topics). If consumers > partitions, extras sit idle—they join the group, get zero partitions, still heartbeat.

💡 Pro Tip

Before scaling consumers from 3 → 12, verify partition count ≥ 12. Otherwise you add JVM heap and rebalance surface area with zero throughput gain.

Group coordinator

For each group.id, one broker is elected group coordinator—hash of group id mod offsets.topic.num.partitions (default 50) picks which broker owns the internal offsets partition for that group. All join/sync/heartbeat/offset commit traffic goes to this broker.

__consumer_offsets internal topic

Compact topic (default 50 partitions, RF=3) storing committed offsets: key = (group.id, topic, partition), value = offset + optional metadata. Never delete this topic in production—it is Kafka’s source of truth for where each group left off.

sequenceDiagram
  participant C as Consumer
  participant CO as Group Coordinator Broker
  participant OT as __consumer_offsets
  participant L as Partition Leader
  C->>CO: JoinGroup / SyncGroup
  CO-->>C: Assignment: orders-0, orders-1
  C->>L: Fetch orders-0
  C->>CO: OffsetCommit orders-0 offset=1042
  CO->>OT: compacted write
🎯 Interview Tip

“What happens if two consumers use the same group.id?” — they share partitions; each partition goes to one member. Different group.id = duplicate consumption with independent offsets. “What if no group.id?” — standalone consumer with manual partition assignment only; no automatic balancing.

Offset management

Offsets are bookmarks in the partition log—not message IDs. How and when you commit them defines your delivery semantics more than any broker setting.

What an offset is

Offset n means “I have consumed record at position n; next fetch starts at n+1.” Committed offset is the durable bookmark stored in __consumer_offsets. Log end offset (LEO) is the next offset the producer would write—lag = LEO − committed.

enable.auto.commit (default true)

Automatically commits offsets at auto.commit.interval.ms (default 5000ms) for offsets returned by the last poll—not per-record. Dangerous with slow processing: poll returns batch → auto-commit fires 5s later → records still being processed → crash → offsets already advanced → records skipped (at-most-once behavior).

Properties — disable auto commit
enable.auto.commit=false
# Required for manual control in production services

Manual commit: commitSync() vs commitAsync()

Method Behavior When to use
commitSync() Blocks until broker ack; retries on retriable errors Shutdown hooks, after critical batch, simplicity
commitAsync() Non-blocking; no automatic retry on failure Hot path; use callback to log/handle commit failures
commitSync(Map) Commit specific offsets per partition with metadata Partial batch success, transactional boundaries

At-least-once: process THEN commit

Correct pattern for manual commit: fully process record(s), then commit offset of last successfully processed record + 1. On crash before commit, records replay—handler must be idempotent.

Java — at-least-once manual commit
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));

for (TopicPartition partition : records.partitions()) {
  List<ConsumerRecord<String, String>> partRecords = records.records(partition);
  long lastOffset = -1;

  for (ConsumerRecord<String, String> record : partRecords) {
    process(record);  // may throw — offset not committed yet
    lastOffset = record.offset();
  }

  if (lastOffset >= 0) {
    consumer.commitSync(Map.of(
        partition,
        new OffsetAndMetadata(lastOffset + 1)
    ));
  }
}

At-most-once: commit THEN process (almost never right)

Committing before processing means a crash after commit loses records silently. Only acceptable for best-effort metrics where loss is tolerable—never for billing, inventory, or audit.

Seek operations

  • consumer.seek(tp, offset) — jump to absolute offset; next poll reads from there
  • seekToBeginning(Collection<TopicPartition>) — replay from start of retention
  • seekToEnd(Collection<TopicPartition>) — skip to latest (only new records)
  • offsetsForTimes(Map<TopicPartition, Long>) — find offset for timestamp per partition (replay from incident time)
Java — replay from timestamp
TopicPartition tp = new TopicPartition("orders", 0);
long incidentMs = 1717200000000L;

Map<TopicPartition, Long> query = Map.of(tp, incidentMs);
Map<TopicPartition, OffsetAndTimestamp> offsets =
    consumer.offsetsForTimes(query);

OffsetAndTimestamp oat = offsets.get(tp);
if (oat != null) {
  consumer.seek(tp, oat.offset());
  log.info("Rewound {} to offset {} at ts {}", tp, oat.offset(), oat.timestamp());
}
application.yml — manual offset control
spring.kafka.consumer:
  enable-auto-commit: false
  auto-offset-reset: earliest
  properties:
    isolation.level: read_committed
⚖️ Trade-off

commitSync after every record = safest, slowest (RTT per record). Batch commits per poll partition = higher throughput, larger replay window on failure. Size batches to your idempotency budget.

Rebalancing — the most important topic

Rebalance is a stop-the-world (or incremental) redistribution of partitions across group members. Every production incident involving “duplicate processing” or “lag spike at deploy” traces here.

What triggers rebalance

Trigger Mechanism Mitigation
Consumer joins group New member needs partition share Scale consumers gradually; use cooperative assignor
Consumer leaves / crashes session.timeout.ms exceeded without heartbeat Tune timeouts; graceful close()
max.poll.interval.ms exceeded poll() not called in time—slow processing Increase interval or offload processing to worker pool
Partition count change Topic partitions increased Expected; ensure assignor handles new partitions
Subscription change subscribe() different topics Plan topic changes as deploy events
Static member mismatch group.instance.id conflict Unique instance IDs per pod

Stop-the-world rebalance (eager)

Legacy protocol (pre-cooperative): all consumers revoke all partitions, group re-joins, fresh assignment issued, consumers resume from committed offset. Processing halts cluster-wide for that group during rebalance—seconds to minutes at scale.

Cooperative incremental rebalance (Kafka 2.4+)

CooperativeStickyAssignor: two-phase protocol. Round 1: only partitions that move are revoked; consumers keep processing unmoved partitions. Round 2: revoked partitions reassigned to new owners. Dramatically reduces rebalance pause during rolling deploys.

Properties — cooperative sticky assignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

group.instance.id — static membership

Stable instance identifier (e.g. pod name) survives consumer restart within session.timeout.ms. Coordinator delays rebalance on brief restart—critical for Kubernetes rolling updates where pod dies and replacement joins with same ID.

application.yml — K8s static membership
spring.kafka.consumer:
  group-id: order-processor
  properties:
    group.instance.id: ${HOSTNAME}
    partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
    session.timeout.ms: 45000
    heartbeat.interval.ms: 15000

ConsumerRebalanceListener

Hook into partition lifecycle—mandatory for correct offset handling on revoke:

Java — rebalance listener
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // CRITICAL: commit offsets for partitions being taken away
    try {
      consumer.commitSync();
    } catch (Exception e) {
      log.error("Commit on revoke failed", e);
    }
    // Close any partition-scoped resources (files, locks)
  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // Optional: seek to committed offset or custom position
    // consumer.seek(...) if not using auto reset
    log.info("Assigned partitions: {}", partitions);
  }
});
⚠️ Pitfall

Rebalancing storms — autoscaler adds/removes consumers rapidly, or health checks kill pods before close(), causing continuous rebalance. Lag spikes, duplicate processing, and max.poll.interval violations compound. Stabilize replica count; use static membership + cooperative assignor.

📦 Real World

LinkedIn pioneered cooperative rebalancing to cut deploy-time consume pause. Uber mandates group.instance.id on stateful stream consumers in Kubernetes to survive pod restarts without full group rebalance.

Consumer configuration deep dive

Consumer tuning is balancing liveness (heartbeats, poll frequency), throughput (fetch batching), and correctness (offsets, isolation).

Property Default Production Impact
group.id — (required) Stable per app role Offset namespace; changing it reprocesses from auto.offset.reset
max.poll.interval.ms 300000 (5 min) 300000–900000 for slow handlers Max gap between poll() calls before declared dead
session.timeout.ms 45000 (Kafka 3.x) 45000–60000 Heartbeat failure → rebalance
heartbeat.interval.ms 3000 session.timeout / 3 Must be < session.timeout / 3
fetch.min.bytes 1 1 (latency) or 100000 (throughput) Broker batches fetch response size
fetch.max.wait.ms 500 500 (latency) or 500+ (throughput) Max block waiting for min bytes
max.partition.fetch.bytes 1048576 (1 MB) 1–4 MB Per-partition fetch ceiling
max.poll.records 500 100–1000 Records per poll—bounds processing burst
auto.offset.reset latest earliest (backfill) / latest (live only) Behavior when no committed offset exists
isolation.level read_uncommitted read_committed (EOS) Filter aborted transactional records
enable.auto.commit true false for services Automatic offset commits on interval
auto.commit.interval.ms 5000 N/A if auto commit disabled Commit frequency for auto mode

max.poll.interval.ms — the slow handler knob

If processing 500 records takes 6 minutes and you never call poll() in between, the consumer is kicked from the group—even though heartbeat thread may still run in modern clients, the poll contract is separate. Fix: increase max.poll.interval.ms, reduce max.poll.records, or delegate processing to workers and poll frequently.

auto.offset.reset semantics

  • earliest — start at beginning of retention when no offset exists
  • latest — skip to end; only new records (default—surprises teams on first deploy)
  • none — throw exception if no offset—forces explicit positioning
application.yml — production consumer
spring.kafka.consumer:
  group-id: payment-processor
  enable-auto-commit: false
  auto-offset-reset: earliest
  max-poll-records: 250
  properties:
    max.poll.interval.ms: 600000
    session.timeout.ms: 45000
    heartbeat.interval.ms: 15000
    partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
    group.instance.id: ${HOSTNAME}
    isolation.level: read_committed
    fetch.max.wait.ms: 500
    fetch.min.bytes: 1
⚙️ Config

Heartbeat rule: heartbeat.interval.mssession.timeout.ms / 3. Poll rule: time to process max.poll.records << max.poll.interval.ms.

Consumer lag

Lag is the distance between what producers wrote and what your group has durably committed as processed. It is the single most important consumer health metric—more actionable than CPU or heap.

Definition

Per partition: lag = log end offset − committed offset (sometimes measured against consumer position instead of committed—tools vary; align with your dashboard). Sum across partitions for group lag. Lag of 0 means fully caught up on that partition.

Why lag matters most

  • Growing lag → consumer cannot keep pace → SLA breach for real-time pipelines
  • Per-partition lag skew → hot partition or slow consumer instance
  • Lag spike → often rebalance or deploy, not producer surge
  • Stuck lag on one partition → poison message or single-thread bottleneck

Monitoring tools

Tool What it shows
kafka-consumer-groups.sh --describe Per-partition CURRENT-OFFSET, LOG-END-OFFSET, LAG, consumer id
Burrow (LinkedIn) Lag evaluation with status OK/WARN/ERR; SLA-oriented alerting
Cruise Control Cluster metrics + lag-aware partition rebalancing
Prometheus + Grafana kafka_consumer_fetch_manager_records_lag_max JMX → dashboard
Confluent Control Center / Cloud UI lag charts per group
Shell — describe consumer group lag
kafka-consumer-groups.sh \
  --bootstrap-server broker:9092 \
  --group payment-processor \
  --describe

# Output columns: TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID

Lag growing steadily

Producer rate > consumer processing rate. Remedies: add consumers (if partitions allow), optimize handler, increase parallelism via partition count (long-term), fix downstream DB/API latency, scale processing tier.

Lag jump (sudden spike)

  • Rebalance — partitions revoked mid-batch; temporary duplication risk; lag redistributes
  • Consumer restart — catch-up from committed offset while producers continued
  • GC pause / JVM STW — missed polls; may trigger rebalance cascade
  • Broker leader election — brief fetch delay, not true lag—distinguish via metrics
💡 Pro Tip

Alert on records-lag-max and rate of change, not absolute lag alone—a steady 10k lag may be fine if bounded; 10k → 50k in 5 minutes is not.

Interactive consumer lag simulator

Adjust produce and consume rates to see lag accumulate or drain in real time. Hit “Simulate rebalance” to inject a lag spike—what you see after rolling deploys.

lag += (produce_rate − consume_rate) per tick · rebalance adds one-time spike (duplicate processing + catch-up pause)

🔬 Under the Hood

Real lag uses broker log end offset vs committed offset in __consumer_offsets. This simulator models the same math: when produce > consume, lag grows linearly; when consume > produce, lag drains until zero.

Multi-threading with consumers

KafkaConsumer is not thread-safe. Only one thread may call poll(), commit, seek, or pause at a time. Violating this causes undefined behavior and cryptic coordinator errors.

Pattern 1: one consumer per thread

Simplest and most common: N threads, N KafkaConsumer instances, same group.id. Group coordinator assigns partitions across them automatically. Scale N ≤ total partitions.

Java — executor per consumer thread
ExecutorService executor = Executors.newFixedThreadPool(3);

for (int i = 0; i < 3; i++) {
  Properties props = baseConsumerProps();
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  consumer.subscribe(List.of("orders"));

  executor.submit(() -> {
    try {
      while (running) {
        for (ConsumerRecord<String, String> r : consumer.poll(Duration.ofMillis(500))) {
          process(r);
        }
      }
    } finally {
      consumer.close();
    }
  });
}

Pattern 2: consumer thread + worker pool

One dedicated thread polls; hands records to a fixed thread pool for processing. Risk: workers fall behind → must pause() partitions with deep queues and resume() when caught up—otherwise unbounded memory and max.poll.interval violations if poll thread blocks on queue.

Java — pause/resume backpressure
BlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>(1000);
Set<TopicPartition> paused = new HashSet<>();

// Worker threads drain queue and process
// Poll thread:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> r : records) {
  if (!queue.offer(r)) {
    TopicPartition tp = new TopicPartition(r.topic(), r.partition());
    if (paused.add(tp)) {
      consumer.pause(Set.of(tp));
    }
    break;
  }
}
// When queue drains: consumer.resume(paused); paused.clear();

Pattern 3: virtual threads (Java 21+)

Spawn one virtual thread per record (or per small batch) inside the poll loop—cheap concurrency for I/O-bound handlers. Poll thread must still call poll() frequently; use ExecutorService.newVirtualThreadPerTaskExecutor() and either small max.poll.records or periodic poll in a separate scheduler thread (advanced).

Java 21 — virtual thread per record
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  while (running) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(200));

    for (ConsumerRecord<String, String> record : records) {
      executor.submit(() -> process(record));
    }
    // Keep max.poll.records low; monitor max.poll.interval
  }
}
Pattern Pros Cons
One consumer / thread Simple, safe, scales with partitions N JVM consumers = N connections
Poll + worker pool Parallel processing within partition* *Breaks per-partition order; pause/resume complexity
Virtual threads High concurrency for I/O-bound work Easy to violate poll interval if misconfigured
⚠️ Pitfall

Sharing one KafkaConsumer across @Async Spring beans without synchronization corrupts internal state. Use ConcurrentKafkaListenerContainerFactory with concurrency instead—it manages one consumer per thread correctly.

🎯 Interview Tip

“How do you parallelize consumption?” — first answer: more partitions + more consumers in the group (preserves order per key). Parallel processing within a partition sacrifices order—only if business allows. Mention pause/resume for backpressure.