Producer Internals & Configuration
producer.send() looks like one line in your app; inside the client it is a pipeline of serialization, partitioning, batching, background I/O, and retry logic that determines whether you get duplicates, ordering violations, or silent data loss. This chapter traces every hop from ProducerRecord to broker ack.
Producer architecture
The Kafka producer is split across the thread that calls send() and a background Sender I/O thread. Your thread never talks to the broker directly—it enqueues work and returns (unless the buffer is full).
flowchart LR PR[ProducerRecord] SER[Serializer\nkey + value] PAR[Partitioner] ACC[RecordAccumulator\nper-partition batches] SND[Sender thread\nnetwork I/O] BRK[Broker leader] PR --> SER --> PAR --> ACC --> SND --> BRK
ProducerRecord
The unit you create: topic, optional partition, key, value, headers, timestamp (CREATE_TIME or LOG_APPEND_TIME). If partition is set explicitly, the partitioner is skipped. Headers carry metadata (trace IDs, content-type) without polluting the value payload.
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"orders", // topic
order.getCustomerId(), // key → partition affinity
order // value
);
record.headers().add("correlation-id", uuid.getBytes(StandardCharsets.UTF_8));
producer.send(record, (metadata, ex) -> {
if (ex != null) log.error("Failed offset={}", metadata != null ? metadata.offset() : "n/a", ex);
});
Serializer
Serializer<K> and Serializer<V> convert objects to bytes before batching. Configure via key.serializer / value.serializer (class name or Spring property). With Schema Registry: KafkaAvroSerializer prepends magic byte + schema ID.
Partitioner
Chooses partition when not explicitly set on the record. Default: sticky partitioner (Kafka 2.4+) or murmur2 hash of key. Runs on the calling thread before append to accumulator.
RecordAccumulator
In-memory buffer organized as batch deque per topic-partition (actually per TopicPartition → leader broker node). Each batch holds compressed record sets waiting for batch.size or linger.ms. Total memory capped by buffer.memory—when full, send() blocks up to max.block.ms.
RecordAccumulator (buffer.memory = 64 MB) ├── TopicPartition orders-0 → Broker 1 [Batch: 12 KB] [Batch: 4 KB filling…] ├── TopicPartition orders-1 → Broker 2 [Batch: 48 KB ready → Sender drains] ├── TopicPartition orders-2 → Broker 3 [Batch: empty, linger timer running] └── …
Sender thread
Background thread polls the accumulator for ready batches (full size, linger expired, or flush/close). Groups batches by broker node, sends Produce requests, handles responses, retries retriable errors, and invokes callbacks. One Sender per KafkaProducer instance—sharing a producer across threads is supported; sharing one producer per thread is simpler.
In-flight requests
max.in.flight.requests.per.connection (default 5) allows multiple unacknowledged produce batches per broker connection. Higher values improve throughput but—with retries > 0 and acks > 0—can reorder records within a partition if batch 2 succeeds before batch 1 is retried.
The Sender maintains an in-flight map: each produce request gets a sequence number per connection. With enable.idempotence=true, the broker tracks producer ID (PID) + per-partition sequence and rejects out-of-order or duplicate batches—making max.in.flight=5 safe again.
Calling send().get() in a tight loop on the hot path blocks your thread until every batch acks—destroying batching benefits and collapsing throughput to single-record latency.
Partitioning
Partition choice determines ordering scope and load distribution. A bad key strategy creates hot partitions that no amount of consumer scaling fixes.
Default partitioner: sticky (Kafka 2.4+)
Legacy behavior (pre-2.4): round-robin across partitions when key is null—tiny batches spread everywhere. Sticky partitioner (current default): null-key records stick to one partition until a batch is ready to send, then rotate to the next—improving batch fill ratio without sacrificing long-term fairness.
Key-based partitioning
When key is non-null: partition = (murmur2(keyBytes) & 0x7fffffff) % numPartitions. Same key → same partition → per-key ordering. Key bytes come from the serialized key, not hashCode() on the Java object unless your serializer preserves that.
Null key behavior
No ordering guarantee across records. Sticky partitioner batches null-key records to one partition at a time, then switches—good for throughput on audit/log firehose topics where order is irrelevant.
Custom partitioner
Implement org.apache.kafka.clients.producer.Partitioner for domain routing—e.g. route EU customers to partitions 0–3, US to 4–7.
public class RegionPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String region = key != null ? key.toString() : "default";
int numPartitions = cluster.partitionCountForTopic(topic);
return switch (region) {
case "EU" -> Math.floorMod(region.hashCode(), numPartitions / 2);
case "US" -> (numPartitions / 2) + Math.floorMod(region.hashCode(), numPartitions / 2);
default -> Math.floorMod(region.hashCode(), numPartitions);
};
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
spring.kafka.producer.properties:
partitioner.class: com.example.kafka.RegionPartitioner
Partition skew scenarios
| Key choice | What happens | Fix |
|---|---|---|
countryCode = "US" |
70% traffic → one partition | Salting: userId + salt for partition, carry real key in value |
null on ordered topic |
Order not preserved across records | Always set a meaningful key |
tenantId with 2 huge tenants |
Two partitions dominate | Sub-partition by entity ID within tenant |
| Explicit partition in record | Bypasses partitioner—caller owns balance | Round-robin partition in app (rare, expert-only) |
“How do you fix a hot partition?” — diagnose with per-partition BytesInPerSec metrics; fix key design or add partitions (only helps new key distribution, not existing key-to-partition mapping). Mention salting and accepting weaker ordering for that entity.
Uber partitions trip events by cityId for local ordering. Netflix uses high-cardinality keys (user/device IDs) on viewing events to avoid cross-tenant hot spots on shared topics.
Delivery semantics
Producer configuration alone defines what guarantees are possible; consumers and idempotent handlers complete the story. Know which knob sets which semantic before you promise “exactly-once” in a design review.
At-most-once
acks=0 — producer does not wait for broker acknowledgment. Fire-and-forget. Records can be lost if the broker fails after accepting socket bytes but before durable write, or if the batch never arrives. Acceptable for: metrics, sampling, non-critical telemetry where loss is statistically tolerable.
At-least-once (default production pattern)
acks=1 or acks=all with retries > 0 (default Integer.MAX_VALUE with idempotence). If the producer does not receive an ack (timeout, broker restart), it retries—the same record may be written twice. Consumers must deduplicate (idempotent handler, natural key in DB) unless you add idempotence or transactions.
Exactly-once: idempotent producer
enable.idempotence=true (default when using acks=all in modern clients). Broker assigns a Producer ID (PID); each partition gets a monotonic sequence number per batch. Broker deduplicates retries within the same producer session—no duplicate batches land in the log.
- Automatically sets acks=all, retries=MAX, max.in.flight.requests.per.connection=5 (safe with sequences)
- Limitation: PID changes on producer restart—duplicates possible across sessions without transactional.id
Exactly-once: transactional API
For atomic write across multiple partitions/topics and consume-transform-produce:
- initTransactions() — register with transaction coordinator, fence zombie producers with same transactional.id
- beginTransaction()
- send() — records tagged with PID, epoch, sequence
- sendOffsetsToTransaction() — atomic offset commit with output records (stream processing)
- commitTransaction() or abortTransaction()
Transaction state lives in internal topic __transaction_state (50 partitions default). Consumers with isolation.level=read_committed skip aborted transactions and uncommitted data.
sequenceDiagram participant P as Transactional Producer participant TC as Transaction Coordinator participant B as Partition Leaders participant C as read_committed Consumer P->>TC: initTransactions / beginTransaction P->>B: produce batches (txn marker) P->>TC: commitTransaction TC->>B: write COMMIT marker C->>B: fetch (sees only committed)
Properties props = new Properties();
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", "42", "placed"));
producer.send(new ProducerRecord<>("inventory", "sku-9", "reserved"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
Transactions add ~3–5% throughput overhead and latency per commit. Use for consume-transform-produce and multi-topic atomic writes—not for every fire-and-forget metric event.
Idempotent only: enable.idempotence=true. Full EOS produce: add stable transactional.id + consumer isolation.level=read_committed. See Delivery Guarantees for end-to-end patterns.
Producer configuration deep dive
These properties are the production levers. Defaults optimize for generic throughput; financial and ordering-sensitive workloads need explicit overrides.
| Property | Default | Production | Impact |
|---|---|---|---|
acks |
all (idempotent) / 1 | all for durability; 1 for latency-sensitive non-critical |
0=no wait; 1=leader ack; all=ISR quorum ack |
retries |
MAX (idempotent) | Keep default with idempotence; else tune with eyes open on duplicates | At-least-once on transient failures |
retry.backoff.ms |
100 | 100–500 | Pause between retries—reduces broker hammering |
max.in.flight.requests.per.connection |
5 (idempotent) / 5 | 1 strict per-partition order without idempotence; 5 with idempotence |
Parallel unacked batches per broker connection |
linger.ms |
0 | 5–20 throughput; 0 latency | Artificial wait to fill batches |
batch.size |
16384 (16 KB) | 65536–262144 (64–256 KB) | Upper bound per partition batch in bytes |
buffer.memory |
33554432 (32 MB) | 67108864–134217728 | Total accumulator memory—OOM risk if broker slow |
compression.type |
none | lz4 or zstd | CPU vs network—lz4 fast, zstd better ratio |
max.block.ms |
60000 | 5000–60000 | How long send() blocks when buffer full |
delivery.timeout.ms |
120000 | 120000–300000 | Total cap including retries—record fails after this |
enable.idempotence |
true (3.x+) | true | Deduplicate retries per PID session |
transactional.id |
null | Stable per app instance | EOS + fences zombie producers on restart |
acks: 0, 1, all — when to use each
- acks=0 — metrics, lossy telemetry, max throughput, no durability promise
- acks=1 — leader wrote to local log; follower loss before replication means data loss on leader failure
- acks=all (-1) — all ISR replicas acked; requires min.insync.replicas alignment on broker/topic
linger.ms and batch.size interaction
A batch sends when either condition is met: compressed size ≥ batch.size OR linger.ms elapsed since first record added. High linger.ms + large batch.size = maximum throughput; both low = minimum latency, poor batch fill.
compression.type trade-offs
| Codec | CPU | Ratio | Use when |
|---|---|---|---|
| none | None | 1× | Ultra-low latency, already compressed payloads |
| lz4 | Low | Good | Default production choice |
| snappy | Low | Good | Legacy clusters; similar to lz4 |
| zstd | Medium | Best | Bandwidth-constrained cross-AZ replication |
| gzip | High | High | Rare—CPU expensive on brokers if misconfigured |
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
enable.idempotence=true
retries=2147483647
max.in.flight.requests.per.connection=5
compression.type=lz4
linger.ms=10
batch.size=131072
buffer.memory=67108864
delivery.timeout.ms=120000
spring:
kafka:
bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
properties:
enable.idempotence: true
compression.type: lz4
linger.ms: 10
batch.size: 131072
delivery.timeout.ms: 120000
max.in.flight.requests.per.connection: 5
Monitor batch-size-avg via producer metrics—if it stays far below batch.size, increase linger.ms or investigate partition skew starving batch fill.
Error handling
Producer errors split into retriable (network, not-enough-replicas) and fatal (serialization, record too large, authorization). Your callback and exception strategy must treat them differently.
Retriable vs non-retriable errors
| Exception / error | Retriable? | Action |
|---|---|---|
NotEnoughReplicasException |
Yes (until timeout) | Check ISR / min.insync.replicas; alert ops |
NetworkException / timeout |
Yes | Client retries automatically; watch record-retry-rate |
RecordTooLargeException |
No | Increase max.message.bytes on broker/topic or shrink payload |
SerializationException |
No | Fix schema/code—retrying is pointless |
ProducerFencedException |
No | New producer with same transactional.id started—close old instance |
AuthorizationException |
No | Fix ACLs via kafka-acls.sh |
Callback API
send(ProducerRecord, Callback) fires on the Sender thread when the batch containing your record is acknowledged or failed. onCompletion(RecordMetadata metadata, Exception exception) — if exception != null, the record did not land (after retries exhausted).
producer.send(record, (metadata, ex) -> {
if (ex != null) {
metrics.increment("producer.errors");
deadLetterPublisher.publish(record, ex);
return;
}
log.debug("topic={} partition={} offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
});
Dead letter topic pattern
Records that cannot be delivered (serialization failure caught pre-send, or non-retriable post-send) go to {original-topic}.DLT with headers: kafka_dlt-exception-message, original topic/partition/offset, timestamp. Enables replay after fix without blocking the main pipeline.
ProducerFencedException during rolling deploys with transactional producers—two instances briefly share the same transactional.id. Use unique IDs per instance (hostname suffix) or ensure old pod terminates before new one starts.
After delivery.timeout.ms, the producer marks in-flight batches failed and invokes callbacks with TimeoutException—even if the record eventually lands on broker (duplicate risk if retries were not idempotent).
Producer tuning for throughput
Maximize bytes per request and records per second without blocking the calling thread. Benchmark before and after with kafka-producer-perf-test.sh.
Baseline benchmark
kafka-producer-perf-test.sh \
--topic perf-test \
--num-records 5000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=broker:9092 \
acks=1 compression.type=lz4 linger.ms=20 batch.size=131072
Recommended throughput profile
| Property | Throughput value | Why |
|---|---|---|
linger.ms | 20 | Wait for fuller batches |
batch.size | 131072 (128 KB) | Larger network frames |
compression.type | lz4 | Cut bytes on wire with low CPU |
buffer.memory | 134217728 (128 MB) | Absorb broker back-pressure |
acks | 1 (non-critical) or all (durable) | acks=1 saves replication wait |
max.in.flight | 5 | Pipeline requests (with idempotence) |
Application patterns
- Async send — fire send() with callback; never get() per record in loop
- Flush on shutdown — producer.flush() then close() with timeout in @PreDestroy
- Partition count — more partitions = more parallel broker writes (ceiling: broker metadata/file handles)
- Binary payloads — Avro/Protobuf/JSON compressed beats verbose JSON strings
spring.kafka.producer:
acks: 1
properties:
linger.ms: 20
batch.size: 131072
compression.type: lz4
buffer.memory: 134217728
enable.idempotence: true
Profile batch fill ratio with JMH or producer metrics: target batch-size-avg ≥ 50% of batch.size. If low, traffic may be too sparse—increase linger.ms or consolidate micro-records.
acks=1 + high throughput sacrifices durability on leader failure before replication. For money movement or audit logs, keep acks=all and tune batching instead.
Producer tuning for low latency
Sub-10ms p99 requires sacrificing batch efficiency—send every record immediately, minimize in-flight pipeline depth, and skip compression CPU. Measure end-to-end, not just producer ack time.
Low-latency profile
| Property | Latency value | Why |
|---|---|---|
linger.ms | 0 | Send immediately—no batch wait |
batch.size | 16384 (default) | Small batches flush fast |
acks | 1 | Skip waiting for full ISR (if acceptable) |
compression.type | none | Remove CPU compress step |
max.in.flight.requests.per.connection | 1 | Strict pipeline—no reorder risk, lower queue depth |
buffer.memory | Default or lower | Faster back-pressure signal |
acks=1
linger.ms=0
batch.size=16384
compression.type=none
max.in.flight.requests.per.connection=1
enable.idempotence=true
delivery.timeout.ms=30000
spring.kafka.producer:
acks: 1
properties:
linger.ms: 0
batch.size: 16384
compression.type: none
max.in.flight.requests.per.connection: 1
enable.idempotence: true
What to avoid on the latency path
- Large linger.ms — directly adds artificial delay
- Compression — CPU time per batch; use only if network is the proven bottleneck
- acks=all — waits for slowest ISR replica; necessary for durability, costly for latency
- Synchronous send().get() — blocks app thread on broker RTT
- TLS without hardware acceleration on CPU-bound hosts — profile SSL handshake and encrypt cost
“How do you get low latency from Kafka?” — linger.ms=0, small batches, acks=1, no compression, co-locate producer and broker in same AZ, sufficient partition leaders spread. Clarify durability trade-offs—you cannot have all three of max throughput, min latency, and max durability without compromise.
Confluent documents p99 produce latency under 5ms on co-located clients with linger.ms=0 and acks=1. Cross-region producers add RTT that no client tuning eliminates—architecture must place writers near leaders.