Spring Kafka Integration

Spring Kafka wraps the native client with KafkaTemplate, declarative @KafkaListener containers, pluggable error handlers, and test utilities. It is the default integration layer for JVM microservices on Kafka— understand the container lifecycle and you own production behavior.

developer senior Spring Kafka 3.x

Spring Kafka core

Two primitives dominate application code: KafkaTemplate for producing and @KafkaListener for consuming. Spring Boot auto-configures factories from spring.kafka.* properties when the starter is on the classpath.

KafkaTemplate

High-level producer facade wrapping KafkaProducer. Common send variants:

Method Behavior
send(topic, key, value) Convenience overload; uses default partitioner
sendDefault(key, value) Sends to template.getDefaultTopic()
send(ProducerRecord<K,V>) Full control: headers, partition, timestamp
send(Message<?>) Spring Messaging abstraction with headers
Java — KafkaTemplate sends
@Service
public class OrderPublisher {
  private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

  public OrderPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  public CompletableFuture<SendResult<String, OrderEvent>> publish(OrderEvent event) {
    // Spring Kafka 3.x returns CompletableFuture (was ListenableFuture in 2.x)
    return kafkaTemplate.send("orders", event.getOrderId(), event)
        .whenComplete((result, ex) -> {
          if (ex != null) log.error("send failed", ex);
          else log.debug("offset={}", result.getRecordMetadata().offset());
        });
  }

  public void publishWithHeaders(OrderEvent event) {
    ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
        "orders", null, event.getOrderId(), event,
        List.of(new RecordHeader("eventType", event.getType().getBytes()))
    );
    kafkaTemplate.send(record);
  }
}
🔬 Under the Hood

KafkaTemplate.send returns immediately with a future; actual broker ack happens async. For transactional sends, wrap in executeInTransaction or use @Transactional with a transactional producer factory.

@KafkaListener attributes

Attribute Purpose
topicsTopic name(s) to subscribe
topicPartitionsExplicit topic-partition assignment (bypasses group rebalance)
groupIdConsumer group; overrides global spring.kafka.consumer.group-id
containerFactoryBean name of custom ConcurrentKafkaListenerContainerFactory
id / idIsGroupListener container id; can double as group id
concurrencyOverride factory concurrency per listener
autoStartupDelay start until true or external trigger

Listener method signatures

Spring resolves parameters by type and annotations—flexible but predictable patterns win in teams:

Java — @KafkaListener signature variants
// 1. Value only — simplest
@KafkaListener(topics = "orders", groupId = "order-processor")
public void onOrder(OrderEvent order) {
  process(order);
}

// 2. Full ConsumerRecord — metadata access
@KafkaListener(topics = "orders")
public void onRecord(ConsumerRecord<String, OrderEvent> record) {
  log.info("partition={} offset={}", record.partition(), record.offset());
  process(record.value());
}

// 3. Headers via @Header
@KafkaListener(topics = "orders")
public void onWithHeaders(
    @Payload OrderEvent order,
    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
    @Header(name = "eventType", required = false) String eventType) {
  process(order);
}

// 4. Manual commit — requires MANUAL ackMode on factory
@KafkaListener(topics = "orders", containerFactory = "manualAckFactory")
public void onManual(OrderEvent order, Acknowledgment ack) {
  process(order);
  ack.acknowledge();  // commit offset after successful processing
}

// 5. Batch listener
@KafkaListener(topics = "orders", containerFactory = "batchFactory")
public void onBatch(List<ConsumerRecord<String, OrderEvent>> records) {
  records.forEach(r -> process(r.value()));
}

@SendTo — request-reply forwarding

Return value is sent to reply topic from KafkaHeaders.REPLY_TOPIC request header. Used with ReplyingKafkaTemplate on the client side (see Request-reply pattern).

Java — @SendTo worker
@KafkaListener(topics = "inventory-requests")
@SendTo  // reply topic from KafkaHeaders.REPLY_TOPIC on incoming record
public InventoryResponse checkStock(@Payload InventoryRequest request) {
  return inventoryService.check(request);
}
💡 Pro Tip

Prefer @Payload + @Header over raw ConsumerRecord in business listeners—easier to test. Use ConsumerRecord when you need offset-based idempotency keys.

Consumer container configuration

ConcurrentKafkaListenerContainerFactory builds listener containers— one Kafka consumer (or batch consumer) per concurrency slot. Ack mode and thread count determine throughput and delivery semantics.

flowchart TB
  F[ConcurrentKafkaListenerContainerFactory\nconcurrency=3]
  C1[Consumer thread 1\npartitions 0,3]
  C2[Consumer thread 2\npartitions 1,4]
  C3[Consumer thread 3\npartitions 2,5]
  F --> C1
  F --> C2
  F --> C3

Factory bean setup

Java — container factories
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
    ConsumerFactory<String, OrderEvent> consumerFactory) {
  ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory);
  factory.setConcurrency(3);
  factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
  factory.getContainerProperties().setPollTimeout(Duration.ofSeconds(3).toMillis());
  return factory;
}

AckMode

Controls when offsets commit relative to listener invocation:

Mode Commit timing Use when
RECORD After each record listener returns (default) Most services — process-then-commit per message
BATCH After entire poll batch processed Batch listeners; higher throughput, coarser commits
TIME On interval timer Looser commit cadence acceptable
COUNT Every N records Amortize commit overhead
COUNT_TIME N records OR time elapsed Balance of COUNT and TIME
MANUAL When ack.acknowledge() called (async commit) DB transaction completes before commit
MANUAL_IMMEDIATE Sync commit on acknowledge() Stronger commit visibility before next poll
yaml — Spring Boot ack mode
spring:
  kafka:
    listener:
      ack-mode: record
      concurrency: 3
      type: single   # or batch

concurrency

Sets number of KafkaMessageListenerContainer instances (consumer threads) per listener. Upper bound = partition count—extra threads sit idle. Rule: concurrency ≤ partitions on subscribed topics.

ContainerProperties

Property Effect
pollTimeout Max wait in consumer.poll() — affects shutdown responsiveness
idleBetweenPolls Pause between polls when no records — throttle hot loops
stopContainerWhenFenced Stop on ProducerFencedException / transactional fencing
syncCommits Use sync offset commit (slower, stricter)
authExceptionRetryInterval Backoff on SASL auth failures
⚠️ Pitfall

enable-auto-commit=true on consumer factory fights manual Acknowledgment—disable auto-commit when using MANUAL ack modes. Spring Boot sets enable-auto-commit: false when listener ack-mode is manual.

Error handling in Spring Kafka

Listener exceptions default to log-and-seek (infinite retry on same record) unless you configure a CommonErrorHandler. DefaultErrorHandler (2.8+) is the modern replacement for SeekToCurrentErrorHandler + ErrorHandlingDeserializer combos.

DefaultErrorHandler

On failure: optionally retry with BackOff, then invoke recoverer (DLT publish or custom). Pauses consumer during backoff—blocking retry on the same partition.

Java — DefaultErrorHandler + DLT
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, OrderEvent> template) {
  DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
      template,
      (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition())
  );

  // 3 retries, 2s apart
  DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, new FixedBackOff(2000L, 3));

  // Skip retries for poison schema / validation errors
  handler.addNotRetryableExceptions(
      DeserializationException.class,
      ValidationException.class
  );
  return handler;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory(
    ConsumerFactory<String, OrderEvent> cf,
    DefaultErrorHandler errorHandler) {
  ConcurrentKafkaListenerContainerFactory<String, OrderEvent> f =
      new ConcurrentKafkaListenerContainerFactory<>();
  f.setConsumerFactory(cf);
  f.setCommonErrorHandler(errorHandler);
  return f;
}

BackOff strategies

BackOff Behavior
FixedBackOff Fixed interval, max attempts — new FixedBackOff(1000L, 3)
ExponentialBackOffWithMaxRetries Increasing delay — good for downstream DB outages
NoBackOff Immediate recoverer invocation — fail fast to DLT
Java — exponential backoff + DLT
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5);
backOff.setInitialInterval(500L);
backOff.setMultiplier(2.0);
backOff.setMaxInterval(30_000L);

DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);

DeadLetterPublishingRecoverer

Publishes failed record to DLT via KafkaTemplate. Customize partition resolver and add headers (setHeadersFunction) for exception class, stack trace, original offset. See DLQ pattern for naming and metadata conventions.

@RetryableTopic — non-blocking retry

Annotation-driven alternative: failed messages route to orders-retry-0, orders-retry-1, … with delay topics—does not block the main partition consumer. Final failure → orders-dlt.

flowchart LR
  M[orders main topic]
  R0[orders-retry-0\ndelay 1s]
  R1[orders-retry-1\ndelay 5s]
  D[orders-dlt]

  M -->|fail| R0
  R0 -->|fail| R1
  R1 -->|fail| D
Java — @RetryableTopic
@RetryableTopic(
    attempts = "4",
    backoff = @Backoff(delay = 1000, multiplier = 2.0, maxDelay = 30_000),
    dltStrategy = DltStrategy.FAIL_ON_ERROR,
    include = { TransientDataAccessException.class },
    exclude = { ValidationException.class }
)
@KafkaListener(topics = "orders", groupId = "order-processor")
public void process(@Payload OrderEvent order) {
  orderService.process(order);
}

@DltHandler
public void handleDlt(OrderEvent order, @Header(KafkaHeaders.EXCEPTION_MESSAGE) String error) {
  log.error("DLT orderId={} error={}", order.getOrderId(), error);
}
Approach Blocking? Best for
DefaultErrorHandler + BackOff Yes — partition stalled during retry Few retries, fast recovery, simple ops
@RetryableTopic No — separate retry consumers Long delays, high-throughput topics
🎯 Interview Tip

“Spring Kafka error handling?” — DefaultErrorHandler replaces deprecated seek handlers; configure BackOff + DeadLetterPublishingRecoverer. For non-blocking retries use @RetryableTopic (separate retry topics). Mark deserialization errors non-retriable.

Transactions in Spring Kafka

Spring integrates Kafka transactions via KafkaTransactionManager—bridging @Transactional boundaries with consume-process-produce atomicity inside Kafka.

KafkaTransactionManager

Requires transactional.id on producer factory. Manages initTransactions / beginTransaction / commit / abort around Spring transaction scope.

Java — transactional producer factory
@Bean
public ProducerFactory<String, OrderEvent> transactionalProducerFactory() {
  Map<String, Object> props = new HashMap<>(producerProps());
  props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-txn");
  DefaultKafkaProducerFactory<String, OrderEvent> factory =
      new DefaultKafkaProducerFactory<>(props);
  factory.setTransactionIdPrefix("order-service-txn-");
  return factory;
}

@Bean
public KafkaTransactionManager<String, OrderEvent> kafkaTransactionManager(
    ProducerFactory<String, OrderEvent> pf) {
  return new KafkaTransactionManager<>(pf);
}

@Transactional on @KafkaListener

Listener runs inside Kafka transaction: read records → process → send output → commit offsets atomically with sends. Consumer must use isolation.level=read_committed.

Java — transactional listener
@Transactional("kafkaTransactionManager")
@KafkaListener(topics = "orders", groupId = "enrichment-service")
public void enrichAndPublish(ConsumerRecord<String, OrderEvent> record) {
  OrderEvent enriched = enrichmentService.enrich(record.value());
  kafkaTemplate.send("orders-enriched", record.key(), enriched);
  // offset commit for this record included in same transaction
}

ChainedKafkaTransactionManager

Coordinates Kafka + database in one Spring @Transactional— Kafka transaction and DataSourceTransactionManager chained. Not true XA 2PC—failure between commits can leave inconsistency; use outbox pattern for cross-system guarantees.

Java — chained Kafka + DB
@Bean
public ChainedKafkaTransactionManager<String, OrderEvent> chainedTxManager(
    KafkaTransactionManager<String, OrderEvent> kafkaTm,
    DataSourceTransactionManager dbTm) {
  return new ChainedKafkaTransactionManager<>(kafkaTm, dbTm);
}

@Transactional("chainedKafkaTransactionManager")
public void updateDbAndPublish(OrderEvent event) {
  orderRepository.save(event.toEntity());
  kafkaTemplate.send("orders-processed", event.getOrderId(), event);
}

Exactly-once in Spring

  • KafkaTransactionManager + transactional producer
  • Consumer isolation.level=read_committed
  • Single consume-transform-produce per transaction boundary
  • External DB writes still need idempotency or outbox—Kafka EOS does not span PostgreSQL
yaml — read_committed consumer
spring:
  kafka:
    consumer:
      properties:
        isolation.level: read_committed
    producer:
      transaction-id-prefix: order-service-txn-
⚖️ Trade-off

Transactional listeners add latency (transaction commit per record/batch) and require transactional.id fencing. Prefer idempotent consumers + outbox for most microservices; reserve Kafka transactions for stream-like enrichment inside the JVM.

Testing with Spring Kafka

Test pyramid: MockProducer/MockConsumer for unit tests, @EmbeddedKafka for fast integration, Testcontainers for broker-faithful CI.

EmbeddedKafkaBroker

In-process Kafka broker (ZK-less / KRaft depending on version) from spring-kafka-test. Spins up real TCP listeners on random ports—no Docker required. Faster than Testcontainers; lighter than mocking.

Java — @EmbeddedKafka integration test
@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = { "orders", "orders.DLT" },
    brokerProperties = { "listeners=PLAINTEXT://localhost:0", "port=0" }
)
class OrderListenerIT {

  @Autowired
  private KafkaTemplate<String, OrderEvent> kafkaTemplate;

  @Autowired
  private EmbeddedKafkaBroker embeddedKafka;

  @Test
  void shouldProcessOrder() {
    OrderEvent event = new OrderEvent("ord-1", "PLACED");
    kafkaTemplate.send("orders", "ord-1", event);

    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
        "test-group", "true", embeddedKafka);
    Consumer<String, OrderEvent> consumer = new DefaultKafkaConsumer<>(
        consumerProps, new StringDeserializer(), new JsonDeserializer<>(OrderEvent.class));
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders-processed");

    ConsumerRecords<String, OrderEvent> records =
        KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10));
    assertThat(records.count()).isEqualTo(1);
    assertThat(records.iterator().next().value().getOrderId()).isEqualTo("ord-1");
  }
}

KafkaTestUtils helpers

Method Purpose
consumerProps(groupId, autoCommit, broker)Ready-made consumer config map
producerProps(broker)Ready-made producer config map
getRecords(consumer)Poll until timeout — primary assertion helper
getSingleRecord(consumer, Duration)Expect exactly one record

Testcontainers + Kafka

Real Kafka in Docker—closer to production (protocol versions, Connect plugins, SASL). Slower CI; use for contract tests and auth-enabled paths.

Java — Testcontainers Kafka
@Testcontainers
@SpringBootTest
class OrderServiceContainerIT {

  @Container
  static KafkaContainer kafka = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));

  @DynamicPropertySource
  static void kafkaProps(DynamicPropertyRegistry registry) {
    registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
  }

  @Autowired
  private KafkaTemplate<String, OrderEvent> kafkaTemplate;

  @Test
  void publishesToRealBroker() {
    kafkaTemplate.send("orders", "k1", new OrderEvent("ord-99", "PLACED"));
    // assert via @KafkaListener test spy or consuming test consumer
  }
}

MockProducer & MockConsumer

Apache Kafka client mocks—no broker, no Spring context required. Ideal for pure unit tests of serialization and routing logic.

Java — MockProducer unit test
@Test
void kafkaTemplateUsesMockProducer() {
  MockProducer<String, String> mockProducer = new MockProducer<>(true, null, null);
  DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(
      Map.of(), () -> mockProducer, null);
  KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);

  template.send("orders", "key", "value");
  assertThat(mockProducer.history()).hasSize(1);
  assertThat(mockProducer.history().get(0).topic()).isEqualTo("orders");
}
💡 Pro Tip

Use @EmbeddedKafka for listener + DLT integration tests; MockProducer for publisher unit tests. Reserve Testcontainers for weekly pipeline or SASL/SSL matrix—not every PR.

Spring Kafka + Schema Registry

Avro-typed KafkaTemplate and @KafkaListener require Confluent serializers and registry URL in producer/consumer config. See also Schema Registry chapter.

Serializer configuration

yaml — Avro + Schema Registry
spring:
  kafka:
    bootstrap-servers: kafka:9092
    properties:
      schema.registry.url: http://schema-registry:8081
      specific.avro.reader: true
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        auto.register.schemas: false
        use.latest.version: true
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

AbstractKafkaSchemaSerDeConfig properties

Property constant Config key Notes
SCHEMA_REGISTRY_URL_CONFIG schema.registry.url Comma-separated HA URLs
AUTO_REGISTER_SCHEMAS auto.register.schemas false in production
USE_LATEST_VERSION use.latest.version Producer uses latest registered schema text
VALUE_SUBJECT_NAME_STRATEGY value.subject.name.strategy TopicNameStrategy default
KEY_SUBJECT_NAME_STRATEGY key.subject.name.strategy If keys are Avro

SchemaRegistryClient bean

Java — registry client for admin / CI hooks
@Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.kafka.properties.schema.registry.url}") String url) {
  Map<String, Object> configs = Map.of(
      AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url
  );
  return new CachedSchemaRegistryClient(
      List.of(url.split(",")),
      1000,
      List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()),
      configs
  );
}

@KafkaListener with Avro SpecificRecord

Java — Avro-typed listener
// Generated from Order.avsc via avro-maven-plugin
@KafkaListener(topics = "orders", groupId = "order-processor")
public void onOrder(Order order) {
  log.info("orderId={} status={}", order.getOrderId(), order.getStatus());
}

@Bean
public ConsumerFactory<String, Order> avroConsumerFactory(
    @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
  props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
  return new DefaultKafkaConsumerFactory<>(props);
}
⚙️ Config

Enable specific.avro.reader=true on consumer—without it, deserializer returns GenericRecord and @KafkaListener type conversion fails for Order POJO.

📦 Real World

Pin generated Avro classes and schema version in the same Maven module. Integration tests use @EmbeddedKafka + mock Schema Registry (or Testcontainers for both) to catch SerializationException before deploy.