Spring Kafka Integration
Spring Kafka wraps the native client with KafkaTemplate, declarative @KafkaListener containers, pluggable error handlers, and test utilities. It is the default integration layer for JVM microservices on Kafka— understand the container lifecycle and you own production behavior.
Spring Kafka core
Two primitives dominate application code: KafkaTemplate for producing and @KafkaListener for consuming. Spring Boot auto-configures factories from spring.kafka.* properties when the starter is on the classpath.
KafkaTemplate
High-level producer facade wrapping KafkaProducer. Common send variants:
| Method | Behavior |
|---|---|
send(topic, key, value) |
Convenience overload; uses default partitioner |
sendDefault(key, value) |
Sends to template.getDefaultTopic() |
send(ProducerRecord<K,V>) |
Full control: headers, partition, timestamp |
send(Message<?>) |
Spring Messaging abstraction with headers |
@Service
public class OrderPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public OrderPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<SendResult<String, OrderEvent>> publish(OrderEvent event) {
// Spring Kafka 3.x returns CompletableFuture (was ListenableFuture in 2.x)
return kafkaTemplate.send("orders", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) log.error("send failed", ex);
else log.debug("offset={}", result.getRecordMetadata().offset());
});
}
public void publishWithHeaders(OrderEvent event) {
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"orders", null, event.getOrderId(), event,
List.of(new RecordHeader("eventType", event.getType().getBytes()))
);
kafkaTemplate.send(record);
}
}
KafkaTemplate.send returns immediately with a future; actual broker ack happens async. For transactional sends, wrap in executeInTransaction or use @Transactional with a transactional producer factory.
@KafkaListener attributes
| Attribute | Purpose |
|---|---|
topics | Topic name(s) to subscribe |
topicPartitions | Explicit topic-partition assignment (bypasses group rebalance) |
groupId | Consumer group; overrides global spring.kafka.consumer.group-id |
containerFactory | Bean name of custom ConcurrentKafkaListenerContainerFactory |
id / idIsGroup | Listener container id; can double as group id |
concurrency | Override factory concurrency per listener |
autoStartup | Delay start until true or external trigger |
Listener method signatures
Spring resolves parameters by type and annotations—flexible but predictable patterns win in teams:
// 1. Value only — simplest
@KafkaListener(topics = "orders", groupId = "order-processor")
public void onOrder(OrderEvent order) {
process(order);
}
// 2. Full ConsumerRecord — metadata access
@KafkaListener(topics = "orders")
public void onRecord(ConsumerRecord<String, OrderEvent> record) {
log.info("partition={} offset={}", record.partition(), record.offset());
process(record.value());
}
// 3. Headers via @Header
@KafkaListener(topics = "orders")
public void onWithHeaders(
@Payload OrderEvent order,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(name = "eventType", required = false) String eventType) {
process(order);
}
// 4. Manual commit — requires MANUAL ackMode on factory
@KafkaListener(topics = "orders", containerFactory = "manualAckFactory")
public void onManual(OrderEvent order, Acknowledgment ack) {
process(order);
ack.acknowledge(); // commit offset after successful processing
}
// 5. Batch listener
@KafkaListener(topics = "orders", containerFactory = "batchFactory")
public void onBatch(List<ConsumerRecord<String, OrderEvent>> records) {
records.forEach(r -> process(r.value()));
}
@SendTo — request-reply forwarding
Return value is sent to reply topic from KafkaHeaders.REPLY_TOPIC request header. Used with ReplyingKafkaTemplate on the client side (see Request-reply pattern).
@KafkaListener(topics = "inventory-requests")
@SendTo // reply topic from KafkaHeaders.REPLY_TOPIC on incoming record
public InventoryResponse checkStock(@Payload InventoryRequest request) {
return inventoryService.check(request);
}
Prefer @Payload + @Header over raw ConsumerRecord in business listeners—easier to test. Use ConsumerRecord when you need offset-based idempotency keys.
Consumer container configuration
ConcurrentKafkaListenerContainerFactory builds listener containers— one Kafka consumer (or batch consumer) per concurrency slot. Ack mode and thread count determine throughput and delivery semantics.
flowchart TB F[ConcurrentKafkaListenerContainerFactory\nconcurrency=3] C1[Consumer thread 1\npartitions 0,3] C2[Consumer thread 2\npartitions 1,4] C3[Consumer thread 3\npartitions 2,5] F --> C1 F --> C2 F --> C3
Factory bean setup
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.getContainerProperties().setPollTimeout(Duration.ofSeconds(3).toMillis());
return factory;
}
AckMode
Controls when offsets commit relative to listener invocation:
| Mode | Commit timing | Use when |
|---|---|---|
| RECORD | After each record listener returns (default) | Most services — process-then-commit per message |
| BATCH | After entire poll batch processed | Batch listeners; higher throughput, coarser commits |
| TIME | On interval timer | Looser commit cadence acceptable |
| COUNT | Every N records | Amortize commit overhead |
| COUNT_TIME | N records OR time elapsed | Balance of COUNT and TIME |
| MANUAL | When ack.acknowledge() called (async commit) |
DB transaction completes before commit |
| MANUAL_IMMEDIATE | Sync commit on acknowledge() |
Stronger commit visibility before next poll |
spring:
kafka:
listener:
ack-mode: record
concurrency: 3
type: single # or batch
concurrency
Sets number of KafkaMessageListenerContainer instances (consumer threads) per listener. Upper bound = partition count—extra threads sit idle. Rule: concurrency ≤ partitions on subscribed topics.
ContainerProperties
| Property | Effect |
|---|---|
pollTimeout |
Max wait in consumer.poll() — affects shutdown responsiveness |
idleBetweenPolls |
Pause between polls when no records — throttle hot loops |
stopContainerWhenFenced |
Stop on ProducerFencedException / transactional fencing |
syncCommits |
Use sync offset commit (slower, stricter) |
authExceptionRetryInterval |
Backoff on SASL auth failures |
enable-auto-commit=true on consumer factory fights manual Acknowledgment—disable auto-commit when using MANUAL ack modes. Spring Boot sets enable-auto-commit: false when listener ack-mode is manual.
Error handling in Spring Kafka
Listener exceptions default to log-and-seek (infinite retry on same record) unless you configure a CommonErrorHandler. DefaultErrorHandler (2.8+) is the modern replacement for SeekToCurrentErrorHandler + ErrorHandlingDeserializer combos.
DefaultErrorHandler
On failure: optionally retry with BackOff, then invoke recoverer (DLT publish or custom). Pauses consumer during backoff—blocking retry on the same partition.
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, OrderEvent> template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
template,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition())
);
// 3 retries, 2s apart
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, new FixedBackOff(2000L, 3));
// Skip retries for poison schema / validation errors
handler.addNotRetryableExceptions(
DeserializationException.class,
ValidationException.class
);
return handler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory(
ConsumerFactory<String, OrderEvent> cf,
DefaultErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> f =
new ConcurrentKafkaListenerContainerFactory<>();
f.setConsumerFactory(cf);
f.setCommonErrorHandler(errorHandler);
return f;
}
BackOff strategies
| BackOff | Behavior |
|---|---|
| FixedBackOff | Fixed interval, max attempts — new FixedBackOff(1000L, 3) |
| ExponentialBackOffWithMaxRetries | Increasing delay — good for downstream DB outages |
| NoBackOff | Immediate recoverer invocation — fail fast to DLT |
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5);
backOff.setInitialInterval(500L);
backOff.setMultiplier(2.0);
backOff.setMaxInterval(30_000L);
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);
DeadLetterPublishingRecoverer
Publishes failed record to DLT via KafkaTemplate. Customize partition resolver and add headers (setHeadersFunction) for exception class, stack trace, original offset. See DLQ pattern for naming and metadata conventions.
@RetryableTopic — non-blocking retry
Annotation-driven alternative: failed messages route to orders-retry-0, orders-retry-1, … with delay topics—does not block the main partition consumer. Final failure → orders-dlt.
flowchart LR M[orders main topic] R0[orders-retry-0\ndelay 1s] R1[orders-retry-1\ndelay 5s] D[orders-dlt] M -->|fail| R0 R0 -->|fail| R1 R1 -->|fail| D
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 2.0, maxDelay = 30_000),
dltStrategy = DltStrategy.FAIL_ON_ERROR,
include = { TransientDataAccessException.class },
exclude = { ValidationException.class }
)
@KafkaListener(topics = "orders", groupId = "order-processor")
public void process(@Payload OrderEvent order) {
orderService.process(order);
}
@DltHandler
public void handleDlt(OrderEvent order, @Header(KafkaHeaders.EXCEPTION_MESSAGE) String error) {
log.error("DLT orderId={} error={}", order.getOrderId(), error);
}
| Approach | Blocking? | Best for |
|---|---|---|
| DefaultErrorHandler + BackOff | Yes — partition stalled during retry | Few retries, fast recovery, simple ops |
| @RetryableTopic | No — separate retry consumers | Long delays, high-throughput topics |
“Spring Kafka error handling?” — DefaultErrorHandler replaces deprecated seek handlers; configure BackOff + DeadLetterPublishingRecoverer. For non-blocking retries use @RetryableTopic (separate retry topics). Mark deserialization errors non-retriable.
Transactions in Spring Kafka
Spring integrates Kafka transactions via KafkaTransactionManager—bridging @Transactional boundaries with consume-process-produce atomicity inside Kafka.
KafkaTransactionManager
Requires transactional.id on producer factory. Manages initTransactions / beginTransaction / commit / abort around Spring transaction scope.
@Bean
public ProducerFactory<String, OrderEvent> transactionalProducerFactory() {
Map<String, Object> props = new HashMap<>(producerProps());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-txn");
DefaultKafkaProducerFactory<String, OrderEvent> factory =
new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("order-service-txn-");
return factory;
}
@Bean
public KafkaTransactionManager<String, OrderEvent> kafkaTransactionManager(
ProducerFactory<String, OrderEvent> pf) {
return new KafkaTransactionManager<>(pf);
}
@Transactional on @KafkaListener
Listener runs inside Kafka transaction: read records → process → send output → commit offsets atomically with sends. Consumer must use isolation.level=read_committed.
@Transactional("kafkaTransactionManager")
@KafkaListener(topics = "orders", groupId = "enrichment-service")
public void enrichAndPublish(ConsumerRecord<String, OrderEvent> record) {
OrderEvent enriched = enrichmentService.enrich(record.value());
kafkaTemplate.send("orders-enriched", record.key(), enriched);
// offset commit for this record included in same transaction
}
ChainedKafkaTransactionManager
Coordinates Kafka + database in one Spring @Transactional— Kafka transaction and DataSourceTransactionManager chained. Not true XA 2PC—failure between commits can leave inconsistency; use outbox pattern for cross-system guarantees.
@Bean
public ChainedKafkaTransactionManager<String, OrderEvent> chainedTxManager(
KafkaTransactionManager<String, OrderEvent> kafkaTm,
DataSourceTransactionManager dbTm) {
return new ChainedKafkaTransactionManager<>(kafkaTm, dbTm);
}
@Transactional("chainedKafkaTransactionManager")
public void updateDbAndPublish(OrderEvent event) {
orderRepository.save(event.toEntity());
kafkaTemplate.send("orders-processed", event.getOrderId(), event);
}
Exactly-once in Spring
- KafkaTransactionManager + transactional producer
- Consumer isolation.level=read_committed
- Single consume-transform-produce per transaction boundary
- External DB writes still need idempotency or outbox—Kafka EOS does not span PostgreSQL
spring:
kafka:
consumer:
properties:
isolation.level: read_committed
producer:
transaction-id-prefix: order-service-txn-
Transactional listeners add latency (transaction commit per record/batch) and require transactional.id fencing. Prefer idempotent consumers + outbox for most microservices; reserve Kafka transactions for stream-like enrichment inside the JVM.
Testing with Spring Kafka
Test pyramid: MockProducer/MockConsumer for unit tests, @EmbeddedKafka for fast integration, Testcontainers for broker-faithful CI.
EmbeddedKafkaBroker
In-process Kafka broker (ZK-less / KRaft depending on version) from spring-kafka-test. Spins up real TCP listeners on random ports—no Docker required. Faster than Testcontainers; lighter than mocking.
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = { "orders", "orders.DLT" },
brokerProperties = { "listeners=PLAINTEXT://localhost:0", "port=0" }
)
class OrderListenerIT {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void shouldProcessOrder() {
OrderEvent event = new OrderEvent("ord-1", "PLACED");
kafkaTemplate.send("orders", "ord-1", event);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafka);
Consumer<String, OrderEvent> consumer = new DefaultKafkaConsumer<>(
consumerProps, new StringDeserializer(), new JsonDeserializer<>(OrderEvent.class));
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders-processed");
ConsumerRecords<String, OrderEvent> records =
KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10));
assertThat(records.count()).isEqualTo(1);
assertThat(records.iterator().next().value().getOrderId()).isEqualTo("ord-1");
}
}
KafkaTestUtils helpers
| Method | Purpose |
|---|---|
consumerProps(groupId, autoCommit, broker) | Ready-made consumer config map |
producerProps(broker) | Ready-made producer config map |
getRecords(consumer) | Poll until timeout — primary assertion helper |
getSingleRecord(consumer, Duration) | Expect exactly one record |
Testcontainers + Kafka
Real Kafka in Docker—closer to production (protocol versions, Connect plugins, SASL). Slower CI; use for contract tests and auth-enabled paths.
@Testcontainers
@SpringBootTest
class OrderServiceContainerIT {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Test
void publishesToRealBroker() {
kafkaTemplate.send("orders", "k1", new OrderEvent("ord-99", "PLACED"));
// assert via @KafkaListener test spy or consuming test consumer
}
}
MockProducer & MockConsumer
Apache Kafka client mocks—no broker, no Spring context required. Ideal for pure unit tests of serialization and routing logic.
@Test
void kafkaTemplateUsesMockProducer() {
MockProducer<String, String> mockProducer = new MockProducer<>(true, null, null);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(
Map.of(), () -> mockProducer, null);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.send("orders", "key", "value");
assertThat(mockProducer.history()).hasSize(1);
assertThat(mockProducer.history().get(0).topic()).isEqualTo("orders");
}
Use @EmbeddedKafka for listener + DLT integration tests; MockProducer for publisher unit tests. Reserve Testcontainers for weekly pipeline or SASL/SSL matrix—not every PR.
Spring Kafka + Schema Registry
Avro-typed KafkaTemplate and @KafkaListener require Confluent serializers and registry URL in producer/consumer config. See also Schema Registry chapter.
Serializer configuration
spring:
kafka:
bootstrap-servers: kafka:9092
properties:
schema.registry.url: http://schema-registry:8081
specific.avro.reader: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
auto.register.schemas: false
use.latest.version: true
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
AbstractKafkaSchemaSerDeConfig properties
| Property constant | Config key | Notes |
|---|---|---|
SCHEMA_REGISTRY_URL_CONFIG |
schema.registry.url |
Comma-separated HA URLs |
AUTO_REGISTER_SCHEMAS |
auto.register.schemas |
false in production |
USE_LATEST_VERSION |
use.latest.version |
Producer uses latest registered schema text |
VALUE_SUBJECT_NAME_STRATEGY |
value.subject.name.strategy |
TopicNameStrategy default |
KEY_SUBJECT_NAME_STRATEGY |
key.subject.name.strategy |
If keys are Avro |
SchemaRegistryClient bean
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.kafka.properties.schema.registry.url}") String url) {
Map<String, Object> configs = Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url
);
return new CachedSchemaRegistryClient(
List.of(url.split(",")),
1000,
List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()),
configs
);
}
@KafkaListener with Avro SpecificRecord
// Generated from Order.avsc via avro-maven-plugin
@KafkaListener(topics = "orders", groupId = "order-processor")
public void onOrder(Order order) {
log.info("orderId={} status={}", order.getOrderId(), order.getStatus());
}
@Bean
public ConsumerFactory<String, Order> avroConsumerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
return new DefaultKafkaConsumerFactory<>(props);
}
Enable specific.avro.reader=true on consumer—without it, deserializer returns GenericRecord and @KafkaListener type conversion fails for Order POJO.
Pin generated Avro classes and schema version in the same Maven module. Integration tests use @EmbeddedKafka + mock Schema Registry (or Testcontainers for both) to catch SerializationException before deploy.