Producer Internals & Configuration

producer.send() looks like one line in your app; inside the client it is a pipeline of serialization, partitioning, batching, background I/O, and retry logic that determines whether you get duplicates, ordering violations, or silent data loss. This chapter traces every hop from ProducerRecord to broker ack.

developer senior Kafka 3.x Spring Kafka 3.x

Producer architecture

The Kafka producer is split across the thread that calls send() and a background Sender I/O thread. Your thread never talks to the broker directly—it enqueues work and returns (unless the buffer is full).

flowchart LR
  PR[ProducerRecord]
  SER[Serializer\nkey + value]
  PAR[Partitioner]
  ACC[RecordAccumulator\nper-partition batches]
  SND[Sender thread\nnetwork I/O]
  BRK[Broker leader]
  PR --> SER --> PAR --> ACC --> SND --> BRK

ProducerRecord

The unit you create: topic, optional partition, key, value, headers, timestamp (CREATE_TIME or LOG_APPEND_TIME). If partition is set explicitly, the partitioner is skipped. Headers carry metadata (trace IDs, content-type) without polluting the value payload.

Java
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
    "orders",                    // topic
    order.getCustomerId(),       // key → partition affinity
    order                        // value
);
record.headers().add("correlation-id", uuid.getBytes(StandardCharsets.UTF_8));
producer.send(record, (metadata, ex) -> {
    if (ex != null) log.error("Failed offset={}", metadata != null ? metadata.offset() : "n/a", ex);
});

Serializer

Serializer<K> and Serializer<V> convert objects to bytes before batching. Configure via key.serializer / value.serializer (class name or Spring property). With Schema Registry: KafkaAvroSerializer prepends magic byte + schema ID.

Partitioner

Chooses partition when not explicitly set on the record. Default: sticky partitioner (Kafka 2.4+) or murmur2 hash of key. Runs on the calling thread before append to accumulator.

RecordAccumulator

In-memory buffer organized as batch deque per topic-partition (actually per TopicPartition → leader broker node). Each batch holds compressed record sets waiting for batch.size or linger.ms. Total memory capped by buffer.memory—when full, send() blocks up to max.block.ms.

Sender thread

Background thread polls the accumulator for ready batches (full size, linger expired, or flush/close). Groups batches by broker node, sends Produce requests, handles responses, retries retriable errors, and invokes callbacks. One Sender per KafkaProducer instance—sharing a producer across threads is supported; sharing one producer per thread is simpler.

In-flight requests

max.in.flight.requests.per.connection (default 5) allows multiple unacknowledged produce batches per broker connection. Higher values improve throughput but—with retries > 0 and acks > 0—can reorder records within a partition if batch 2 succeeds before batch 1 is retried.

🔬 Under the Hood

The Sender maintains an in-flight map: each produce request gets a sequence number per connection. With enable.idempotence=true, the broker tracks producer ID (PID) + per-partition sequence and rejects out-of-order or duplicate batches—making max.in.flight=5 safe again.

⚠️ Pitfall

Calling send().get() in a tight loop on the hot path blocks your thread until every batch acks—destroying batching benefits and collapsing throughput to single-record latency.

Partitioning

Partition choice determines ordering scope and load distribution. A bad key strategy creates hot partitions that no amount of consumer scaling fixes.

Default partitioner: sticky (Kafka 2.4+)

Legacy behavior (pre-2.4): round-robin across partitions when key is null—tiny batches spread everywhere. Sticky partitioner (current default): null-key records stick to one partition until a batch is ready to send, then rotate to the next—improving batch fill ratio without sacrificing long-term fairness.

Key-based partitioning

When key is non-null: partition = (murmur2(keyBytes) & 0x7fffffff) % numPartitions. Same key → same partition → per-key ordering. Key bytes come from the serialized key, not hashCode() on the Java object unless your serializer preserves that.

Null key behavior

No ordering guarantee across records. Sticky partitioner batches null-key records to one partition at a time, then switches—good for throughput on audit/log firehose topics where order is irrelevant.

Custom partitioner

Implement org.apache.kafka.clients.producer.Partitioner for domain routing—e.g. route EU customers to partitions 0–3, US to 4–7.

Java — custom partitioner
public class RegionPartitioner implements Partitioner {
  @Override
  public int partition(String topic, Object key, byte[] keyBytes,
                       Object value, byte[] valueBytes, Cluster cluster) {
    String region = key != null ? key.toString() : "default";
    int numPartitions = cluster.partitionCountForTopic(topic);
    return switch (region) {
      case "EU" -> Math.floorMod(region.hashCode(), numPartitions / 2);
      case "US" -> (numPartitions / 2) + Math.floorMod(region.hashCode(), numPartitions / 2);
      default -> Math.floorMod(region.hashCode(), numPartitions);
    };
  }
  @Override public void close() {}
  @Override public void configure(Map<String, ?> configs) {}
}
application.yml
spring.kafka.producer.properties:
  partitioner.class: com.example.kafka.RegionPartitioner

Partition skew scenarios

Key choice What happens Fix
countryCode = "US" 70% traffic → one partition Salting: userId + salt for partition, carry real key in value
null on ordered topic Order not preserved across records Always set a meaningful key
tenantId with 2 huge tenants Two partitions dominate Sub-partition by entity ID within tenant
Explicit partition in record Bypasses partitioner—caller owns balance Round-robin partition in app (rare, expert-only)
🎯 Interview Tip

“How do you fix a hot partition?” — diagnose with per-partition BytesInPerSec metrics; fix key design or add partitions (only helps new key distribution, not existing key-to-partition mapping). Mention salting and accepting weaker ordering for that entity.

📦 Real World

Uber partitions trip events by cityId for local ordering. Netflix uses high-cardinality keys (user/device IDs) on viewing events to avoid cross-tenant hot spots on shared topics.

Delivery semantics

Producer configuration alone defines what guarantees are possible; consumers and idempotent handlers complete the story. Know which knob sets which semantic before you promise “exactly-once” in a design review.

At-most-once

acks=0 — producer does not wait for broker acknowledgment. Fire-and-forget. Records can be lost if the broker fails after accepting socket bytes but before durable write, or if the batch never arrives. Acceptable for: metrics, sampling, non-critical telemetry where loss is statistically tolerable.

At-least-once (default production pattern)

acks=1 or acks=all with retries > 0 (default Integer.MAX_VALUE with idempotence). If the producer does not receive an ack (timeout, broker restart), it retries—the same record may be written twice. Consumers must deduplicate (idempotent handler, natural key in DB) unless you add idempotence or transactions.

Exactly-once: idempotent producer

enable.idempotence=true (default when using acks=all in modern clients). Broker assigns a Producer ID (PID); each partition gets a monotonic sequence number per batch. Broker deduplicates retries within the same producer session—no duplicate batches land in the log.

  • Automatically sets acks=all, retries=MAX, max.in.flight.requests.per.connection=5 (safe with sequences)
  • Limitation: PID changes on producer restart—duplicates possible across sessions without transactional.id

Exactly-once: transactional API

For atomic write across multiple partitions/topics and consume-transform-produce:

  1. initTransactions() — register with transaction coordinator, fence zombie producers with same transactional.id
  2. beginTransaction()
  3. send() — records tagged with PID, epoch, sequence
  4. sendOffsetsToTransaction() — atomic offset commit with output records (stream processing)
  5. commitTransaction() or abortTransaction()

Transaction state lives in internal topic __transaction_state (50 partitions default). Consumers with isolation.level=read_committed skip aborted transactions and uncommitted data.

sequenceDiagram
  participant P as Transactional Producer
  participant TC as Transaction Coordinator
  participant B as Partition Leaders
  participant C as read_committed Consumer
  P->>TC: initTransactions / beginTransaction
  P->>B: produce batches (txn marker)
  P->>TC: commitTransaction
  TC->>B: write COMMIT marker
  C->>B: fetch (sees only committed)
Java — transactional producer
Properties props = new Properties();
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
  producer.beginTransaction();
  producer.send(new ProducerRecord<>("orders", "42", "placed"));
  producer.send(new ProducerRecord<>("inventory", "sku-9", "reserved"));
  producer.commitTransaction();
} catch (Exception e) {
  producer.abortTransaction();
  throw e;
}
⚖️ Trade-off

Transactions add ~3–5% throughput overhead and latency per commit. Use for consume-transform-produce and multi-topic atomic writes—not for every fire-and-forget metric event.

⚙️ Config

Idempotent only: enable.idempotence=true. Full EOS produce: add stable transactional.id + consumer isolation.level=read_committed. See Delivery Guarantees for end-to-end patterns.

Producer configuration deep dive

These properties are the production levers. Defaults optimize for generic throughput; financial and ordering-sensitive workloads need explicit overrides.

Property Default Production Impact
acks all (idempotent) / 1 all for durability; 1 for latency-sensitive non-critical 0=no wait; 1=leader ack; all=ISR quorum ack
retries MAX (idempotent) Keep default with idempotence; else tune with eyes open on duplicates At-least-once on transient failures
retry.backoff.ms 100 100–500 Pause between retries—reduces broker hammering
max.in.flight.requests.per.connection 5 (idempotent) / 5 1 strict per-partition order without idempotence; 5 with idempotence Parallel unacked batches per broker connection
linger.ms 0 5–20 throughput; 0 latency Artificial wait to fill batches
batch.size 16384 (16 KB) 65536–262144 (64–256 KB) Upper bound per partition batch in bytes
buffer.memory 33554432 (32 MB) 67108864–134217728 Total accumulator memory—OOM risk if broker slow
compression.type none lz4 or zstd CPU vs network—lz4 fast, zstd better ratio
max.block.ms 60000 5000–60000 How long send() blocks when buffer full
delivery.timeout.ms 120000 120000–300000 Total cap including retries—record fails after this
enable.idempotence true (3.x+) true Deduplicate retries per PID session
transactional.id null Stable per app instance EOS + fences zombie producers on restart

acks: 0, 1, all — when to use each

  • acks=0 — metrics, lossy telemetry, max throughput, no durability promise
  • acks=1 — leader wrote to local log; follower loss before replication means data loss on leader failure
  • acks=all (-1) — all ISR replicas acked; requires min.insync.replicas alignment on broker/topic

linger.ms and batch.size interaction

A batch sends when either condition is met: compressed size ≥ batch.size OR linger.ms elapsed since first record added. High linger.ms + large batch.size = maximum throughput; both low = minimum latency, poor batch fill.

compression.type trade-offs

Codec CPU Ratio Use when
noneNoneUltra-low latency, already compressed payloads
lz4LowGoodDefault production choice
snappyLowGoodLegacy clusters; similar to lz4
zstdMediumBestBandwidth-constrained cross-AZ replication
gzipHighHighRare—CPU expensive on brokers if misconfigured
Properties — durable production producer
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
enable.idempotence=true
retries=2147483647
max.in.flight.requests.per.connection=5
compression.type=lz4
linger.ms=10
batch.size=131072
buffer.memory=67108864
delivery.timeout.ms=120000
application.yml — Spring Kafka producer
spring:
  kafka:
    bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      properties:
        enable.idempotence: true
        compression.type: lz4
        linger.ms: 10
        batch.size: 131072
        delivery.timeout.ms: 120000
        max.in.flight.requests.per.connection: 5
💡 Pro Tip

Monitor batch-size-avg via producer metrics—if it stays far below batch.size, increase linger.ms or investigate partition skew starving batch fill.

Error handling

Producer errors split into retriable (network, not-enough-replicas) and fatal (serialization, record too large, authorization). Your callback and exception strategy must treat them differently.

Retriable vs non-retriable errors

Exception / error Retriable? Action
NotEnoughReplicasException Yes (until timeout) Check ISR / min.insync.replicas; alert ops
NetworkException / timeout Yes Client retries automatically; watch record-retry-rate
RecordTooLargeException No Increase max.message.bytes on broker/topic or shrink payload
SerializationException No Fix schema/code—retrying is pointless
ProducerFencedException No New producer with same transactional.id started—close old instance
AuthorizationException No Fix ACLs via kafka-acls.sh

Callback API

send(ProducerRecord, Callback) fires on the Sender thread when the batch containing your record is acknowledged or failed. onCompletion(RecordMetadata metadata, Exception exception) — if exception != null, the record did not land (after retries exhausted).

Java — callback with metadata
producer.send(record, (metadata, ex) -> {
  if (ex != null) {
    metrics.increment("producer.errors");
    deadLetterPublisher.publish(record, ex);
    return;
  }
  log.debug("topic={} partition={} offset={}",
      metadata.topic(), metadata.partition(), metadata.offset());
});

Dead letter topic pattern

Records that cannot be delivered (serialization failure caught pre-send, or non-retriable post-send) go to {original-topic}.DLT with headers: kafka_dlt-exception-message, original topic/partition/offset, timestamp. Enables replay after fix without blocking the main pipeline.

⚠️ Pitfall

ProducerFencedException during rolling deploys with transactional producers—two instances briefly share the same transactional.id. Use unique IDs per instance (hostname suffix) or ensure old pod terminates before new one starts.

🔬 Under the Hood

After delivery.timeout.ms, the producer marks in-flight batches failed and invokes callbacks with TimeoutException—even if the record eventually lands on broker (duplicate risk if retries were not idempotent).

Producer tuning for throughput

Maximize bytes per request and records per second without blocking the calling thread. Benchmark before and after with kafka-producer-perf-test.sh.

Baseline benchmark

Shell
kafka-producer-perf-test.sh \
  --topic perf-test \
  --num-records 5000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props bootstrap.servers=broker:9092 \
    acks=1 compression.type=lz4 linger.ms=20 batch.size=131072

Recommended throughput profile

Property Throughput value Why
linger.ms20Wait for fuller batches
batch.size131072 (128 KB)Larger network frames
compression.typelz4Cut bytes on wire with low CPU
buffer.memory134217728 (128 MB)Absorb broker back-pressure
acks1 (non-critical) or all (durable)acks=1 saves replication wait
max.in.flight5Pipeline requests (with idempotence)

Application patterns

  • Async send — fire send() with callback; never get() per record in loop
  • Flush on shutdownproducer.flush() then close() with timeout in @PreDestroy
  • Partition count — more partitions = more parallel broker writes (ceiling: broker metadata/file handles)
  • Binary payloads — Avro/Protobuf/JSON compressed beats verbose JSON strings
application.yml — throughput profile
spring.kafka.producer:
  acks: 1
  properties:
    linger.ms: 20
    batch.size: 131072
    compression.type: lz4
    buffer.memory: 134217728
    enable.idempotence: true
💡 Pro Tip

Profile batch fill ratio with JMH or producer metrics: target batch-size-avg ≥ 50% of batch.size. If low, traffic may be too sparse—increase linger.ms or consolidate micro-records.

⚖️ Trade-off

acks=1 + high throughput sacrifices durability on leader failure before replication. For money movement or audit logs, keep acks=all and tune batching instead.

Producer tuning for low latency

Sub-10ms p99 requires sacrificing batch efficiency—send every record immediately, minimize in-flight pipeline depth, and skip compression CPU. Measure end-to-end, not just producer ack time.

Low-latency profile

Property Latency value Why
linger.ms0Send immediately—no batch wait
batch.size16384 (default)Small batches flush fast
acks1Skip waiting for full ISR (if acceptable)
compression.typenoneRemove CPU compress step
max.in.flight.requests.per.connection1Strict pipeline—no reorder risk, lower queue depth
buffer.memoryDefault or lowerFaster back-pressure signal
Properties — low-latency producer
acks=1
linger.ms=0
batch.size=16384
compression.type=none
max.in.flight.requests.per.connection=1
enable.idempotence=true
delivery.timeout.ms=30000
application.yml — low-latency Spring producer
spring.kafka.producer:
  acks: 1
  properties:
    linger.ms: 0
    batch.size: 16384
    compression.type: none
    max.in.flight.requests.per.connection: 1
    enable.idempotence: true

What to avoid on the latency path

  • Large linger.ms — directly adds artificial delay
  • Compression — CPU time per batch; use only if network is the proven bottleneck
  • acks=all — waits for slowest ISR replica; necessary for durability, costly for latency
  • Synchronous send().get() — blocks app thread on broker RTT
  • TLS without hardware acceleration on CPU-bound hosts — profile SSL handshake and encrypt cost
🎯 Interview Tip

“How do you get low latency from Kafka?” — linger.ms=0, small batches, acks=1, no compression, co-locate producer and broker in same AZ, sufficient partition leaders spread. Clarify durability trade-offs—you cannot have all three of max throughput, min latency, and max durability without compromise.

📦 Real World

Confluent documents p99 produce latency under 5ms on co-located clients with linger.ms=0 and acks=1. Cross-region producers add RTT that no client tuning eliminates—architecture must place writers near leaders.