Override Spring Boot Dependencies

When using Spring for Apache Kafka in a Spring Boot application, the Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version, you need to override all of the associated dependencies. This is especially true when using the embedded Kafka broker in spring-kafka-test.

maven

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.11.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.5.11.RELEASE</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>{kafka-version}</version>
</dependency>

<!-- optional - only needed when using kafka-streams -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>{kafka-version}</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>{kafka-version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>{kafka-version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>{kafka-version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

gradle

dependencies {

    implementation 'org.springframework.kafka:spring-kafka:2.5.11.RELEASE'

    implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
    implementation "org.apache.kafka:kafka-streams:$kafkaVersion" // optional - only needed when using kafka-streams
    testImplementation 'org.springframework.kafka:spring-kafka-test:2.5.11.RELEASE'
    testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
    testImplementation "org.apache.kafka:kafka_2.12:$kafkaVersion"
    testImplementation "org.apache.kafka:kafka_2.12:$kafkaVersion:test"

}

The test scope dependencies are only needed if you are using the embedded Kafka broker in tests.

Appendix A: Jackson/Scala Incompatibility

When using the framework and its test embedded Kafka server with Jackson version 2.11.3 or later on the classpath, Jackson pulls in some incompatible scala jars transitively. The dependency management of Spring Boot version 2.3.5 and later uses that version of Jackson. To correct this problem, you must override those jar versions.

maven

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.12.11</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-reflect</artifactId>
    <version>2.12.11</version>
    <scope>test</scope>
</dependency>

gradle

testImplementation "org.scala-lang:scala-library:2.12.11"
testImplementation "org.scala-lang:scala-reflect:2.12.11"

Appendix B: Change History

Changes between 2.3 and 2.4

Kafka Client Version

This version requires the 2.4.0 kafka-clients or higher and supports the new incremental rebalancing feature.

ConsumerAwareRebalanceListener

Like ConsumerRebalanceListener, this interface now has an additional method onPartitionsLost. Refer to the Apache Kafka documentation for more information.

Unlike the ConsumerRebalanceListener, The default implementation does not call onPartitionsRevoked. Instead, the listener container will call that method after it has called onPartitionsLost; you should not, therefore, do the same when implementing ConsumerAwareRebalanceListener.

See the IMPORTANT note at the end of [rebalance-listeners] for more information.

GenericErrorHandler

The isAckAfterHandle() default implementation now returns true by default.

KafkaTemplate

The KafkaTemplate now supports non-transactional publishing alongside transactional. See [tx-template-mixed] for more information.

AggregatingReplyingKafkaTemplate

The releaseStrategy is now a BiConsumer. It is now called after a timeout (as well as when records arrive); the second parameter is true in the case of a call after a timeout.

See [aggregating-request-reply] for more information.

Listener Container

The ContainerProperties provides an authorizationExceptionRetryInterval option to let the listener container to retry after any AuthorizationException is thrown by the KafkaConsumer. See its JavaDocs and [kafka-container] for more information.

@KafkaListener

The @KafkaListener annotation has a new property splitIterables; default true. When a replying listener returns an Iterable this property controls whether the return result is sent as a single record or a record for each element is sent. See [annotation-send-to] for more information

Batch listeners can now be configured with a BatchToRecordAdapter; this allows, for example, the batch to be processed in a transaction while the listener gets one record at a time. With the default implementation, a ConsumerRecordRecoverer can be used to handle errors within the batch, without stopping the processing of the entire batch - this might be useful when using transactions. See [transactions-batch] for more information.

Kafka Streams

The StreamsBuilderFactoryBean accepts a new property KafkaStreamsInfrastructureCustomizer. This allows configuration of the builder and/or topology before the stream is created. See [streams-spring] for more information.

Changes Between 2.2 and 2.3

This section covers the changes made from version 2.2 to version 2.3.

Also see [new-in-sik].

Tips, Tricks and Examples

A new chapter [tips-n-tricks] has been added. Please submit GitHub issues and/or pull requests for additional entries in that chapter.

Kafka Client Version

This version requires the 2.3.0 kafka-clients or higher.

Class/Package Changes

TopicPartitionInitialOffset is deprecated in favor of TopicPartitionOffset.

Configuration Changes

Starting with version 2.3.4, the missingTopicsFatal container property is false by default. When this is true, the application fails to start if the broker is down; many users were affected by this change; given that Kafka is a high-availability platform, we did not anticipate that starting an application with no active brokers would be a common use case.

Producer and Consumer Factory Changes

The DefaultKafkaProducerFactory can now be configured to create a producer per thread. You can also provide Supplier<Serializer> instances in the constructor as an alternative to either configured classes (which require no-arg constructors), or constructing with Serializer instances, which are then shared between all Producers. See [producer-factory] for more information.

The same option is available with Supplier<Deserializer> instances in DefaultKafkaConsumerFactory. See [kafka-container] for more information.

Listener Container Changes

Previously, error handlers received ListenerExecutionFailedException (with the actual listener exception as the cause) when the listener was invoked using a listener adapter (such as @KafkaListener s). Exceptions thrown by native GenericMessageListener s were passed to the error handler unchanged. Now a ListenerExecutionFailedException is always the argument (with the actual listener exception as the cause), which provides access to the container’s group.id property.

Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. It now sets it to false automatically unless specifically set in the consumer factory or the container’s consumer property overrides.

The ackOnError property is now false by default. See [seek-to-current] for more information.

It is now possible to obtain the consumer’s group.id property in the listener method. See [listener-group-id] for more information.

The container has a new property recordInterceptor allowing records to be inspected or modified before invoking the listener. A CompositeRecordInterceptor is also provided in case you need to invoke multiple interceptors. See [message-listener-container] for more information.

The ConsumerSeekAware has new methods allowing you to perform seeks relative to the beginning, end, or current position and to seek to the first offset greater than or equal to a time stamp. See Seek for more information.

A convenience class AbstractConsumerSeekAware is now provided to simplify seeking. See Seek for more information.

The ContainerProperties provides an idleBetweenPolls option to let the main loop in the listener container to sleep between KafkaConsumer.poll() calls. See its JavaDocs and [kafka-container] for more information.

When using AckMode.MANUAL (or MANUAL_IMMEDIATE) you can now cause a redelivery by calling nack on the Acknowledgment. See [committing-offsets] for more information.

Listener performance can now be monitored using Micrometer Timer s. See [micrometer] for more information.

The containers now publish additional consumer lifecycle events relating to startup. See [events] for more information.

Transactional batch listeners can now support zombie fencing. See [transactions] for more information.

The listener container factory can now be configured with a ContainerCustomizer to further configure each container after it has been created and configured. See [container-factory] for more information.

ErrorHandler Changes

The SeekToCurrentErrorHandler now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.

The SeekToCurrentErrorHandler and SeekToCurrentBatchErrorHandler can now be configured to apply a BackOff (thread sleep) between delivery attempts.

Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record.

See [seek-to-current] for more information.

The DeadLetterPublishingRecoverer, when used in conjunction with an ErrorHandlingDeserializer2, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized. Previously, it was null and user code needed to extract the DeserializationException from the message headers. See [dead-letters] for more information.

TopicBuilder

A new class TopicBuilder is provided for more convenient creation of NewTopic @Bean s for automatic topic provisioning. See [configuring-topics] for more information.

Kafka Streams Changes

You can now perform additional configuration of the StreamsBuilderFactoryBean created by @EnableKafkaStreams. See Streams Configuration for more information.

A RecoveringDeserializationExceptionHandler is now provided which allows records with deserialization errors to be recovered. It can be used in conjunction with a DeadLetterPublishingRecoverer to send these records to a dead-letter topic. See [streams-deser-recovery] for more information.

The HeaderEnricher transformer has been provided, using SpEL to generate the header values. See [streams-header-enricher] for more information.

The MessagingTransformer has been provided. This allows a Kafka streams topology to interact with a spring-messaging component, such as a Spring Integration flow. See [streams-messaging] and [streams-integration] for more information.

JSON Component Changes

Now all the JSON-aware components are configured by default with a Jackson ObjectMapper produced by the JacksonUtils.enhancedObjectMapper(). The JsonDeserializer now provides TypeReference-based constructors for better handling of target generic container types. Also a JacksonMimeTypeModule has been introduced for serialization of org.springframework.util.MimeType to plain string. See its JavaDocs and [serdes] for more information.

A ByteArrayJsonMessageConverter has been provided as well as a new super class for all Json converters, JsonMessageConverter. Also, a StringOrBytesSerializer is now available; it can serialize byte[], Bytes and String values in ProducerRecord s. See [messaging-message-conversion] for more information.

The JsonSerializer, JsonDeserializer and JsonSerde now have fluent APIs to make programmatic configuration simpler. See the javadocs, [serdes], and [serde] for more informaion.

ReplyingKafkaTemplate

When a reply times out, the future is completed exceptionally with a KafkaReplyTimeoutException instead of a KafkaException.

Also, an overloaded sendAndReceive method is now provided that allows specifying the reply timeout on a per message basis.

AggregatingReplyingKafkaTemplate

Extends the ReplyingKafkaTemplate by aggregating replies from multiple receivers. See [aggregating-request-reply] for more information.

Transaction Changes

You can now override the producer factory’s transactionIdPrefix on the KafkaTemplate and KafkaTransactionManager. See [transaction-id-prefix] for more information.

New Delegating Serializer/Deserializer

The framework now provides a delegating Serializer and Deserializer, utilizing a header to enable producing and consuming records with multiple key/value types. See [delegating-serialization] for more information.

New Retrying Deserializer

The framework now provides a delegating RetryingDeserializer, to retry serialization when transient errors such as network problems might occur. See [retrying-deserialization] for more information.

Changes Between 2.1 and 2.2

Kafka Client Version

This version requires the 2.0.0 kafka-clients or higher.

Class and Package Changes

The ContainerProperties class has been moved from org.springframework.kafka.listener.config to org.springframework.kafka.listener.

The AckMode enum has been moved from AbstractMessageListenerContainer to ContainerProperties.

The setBatchErrorHandler() and setErrorHandler() methods have been moved from ContainerProperties to both AbstractMessageListenerContainer and AbstractKafkaListenerContainerFactory.

After Rollback Processing

A new AfterRollbackProcessor strategy is provided. See [after-rollback] for more information.

ConcurrentKafkaListenerContainerFactory Changes

You can now use the ConcurrentKafkaListenerContainerFactory to create and configure any ConcurrentMessageListenerContainer, not only those for @KafkaListener annotations. See [container-factory] for more information.

Listener Container Changes

A new container property (missingTopicsFatal) has been added. See [kafka-container] for more information.

A ConsumerStoppedEvent is now emitted when a consumer stops. See [thread-safety] for more information.

Batch listeners can optionally receive the complete ConsumerRecords<?, ?> object instead of a List<ConsumerRecord<?, ?>. See Batch Listeners for more information.

The DefaultAfterRollbackProcessor and SeekToCurrentErrorHandler can now recover (skip) records that keep failing, and, by default, does so after 10 failures. They can be configured to publish failed records to a dead-letter topic.

Starting with version 2.2.4, the consumer’s group ID can be used while selecting the dead letter topic name.

See [after-rollback], [seek-to-current], and [dead-letters] for more information.

The ConsumerStoppingEvent has been added. See [events] for more information.

The SeekToCurrentErrorHandler can now be configured to commit the offset of a recovered record when the container is configured with AckMode.MANUAL_IMMEDIATE (since 2.2.4). See [seek-to-current] for more information.

@KafkaListener Changes

You can now override the concurrency and autoStartup properties of the listener container factory by setting properties on the annotation. You can now add configuration to determine which headers (if any) are copied to a reply message. See [kafka-listener-annotation] for more information.

You can now use @KafkaListener as a meta-annotation on your own annotations. See [kafka-listener-meta] for more information.

It is now easier to configure a Validator for @Payload validation. See [kafka-validation] for more information.

You can now specify kafka consumer properties directly on the annotation; these will override any properties with the same name defined in the consumer factory (since version 2.2.4). See [annotation-properties] for more information.

Header Mapping Changes

Headers of type MimeType and MediaType are now mapped as simple strings in the RecordHeader value. Previously, they were mapped as JSON and only MimeType was decoded. MediaType could not be decoded. They are now simple strings for interoperability.

Also, the DefaultKafkaHeaderMapper has a new addToStringClasses method, allowing the specification of types that should be mapped by using toString() instead of JSON. See [headers] for more information.

Embedded Kafka Changes

The KafkaEmbedded class and its KafkaRule interface have been deprecated in favor of the EmbeddedKafkaBroker and its JUnit 4 EmbeddedKafkaRule wrapper. The @EmbeddedKafka annotation now populates an EmbeddedKafkaBroker bean instead of the deprecated KafkaEmbedded. This change allows the use of @EmbeddedKafka in JUnit 5 tests. The @EmbeddedKafka annotation now has the attribute ports to specify the port that populates the EmbeddedKafkaBroker. See [testing] for more information.

JsonSerializer/Deserializer Enhancements

You can now provide type mapping information by using producer and consumer properties.

New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.

The JsonDeserializer now removes any type information headers by default.

You can now configure the JsonDeserializer to ignore type information headers by using a Kafka property (since 2.2.3).

See [serdes] for more information.

Kafka Streams Changes

The streams configuration bean must now be a KafkaStreamsConfiguration object instead of a StreamsConfig object.

The StreamsBuilderFactoryBean has been moved from package …​core to …​config.

The KafkaStreamBrancher has been introduced for better end-user experience when conditional branches are built on top of KStream instance.

See [streams-kafka-streams] and [streams-config] for more information.

Transactional ID

When a transaction is started by the listener container, the transactional.id is now the transactionIdPrefix appended with <group.id>.<topic>.<partition>. This change allows proper fencing of zombies, as described here.

Changes Between 2.0 and 2.1

Kafka Client Version

This version requires the 1.0.0 kafka-clients or higher.

The 1.1.x client is supported with version 2.1.5, but you need to override dependencies as described in [deps-for-11x].

The 1.1.x client is supported natively in version 2.2.

JSON Improvements

The StringJsonMessageConverter and JsonSerializer now add type information in Headers, letting the converter and JsonDeserializer create specific types on reception, based on the message itself rather than a fixed configured type. See [serdes] for more information.

Container Stopping Error Handlers

Container error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal/ They stop the container. See [annotation-error-handling] for more information.

Pausing and Resuming Containers

The listener containers now have pause() and resume() methods (since version 2.1.3). See [pause-resume] for more information.

Stateful Retry

Starting with version 2.1.3, you can configure stateful retry. See Stateful Retry for more information.

Client ID

Starting with version 2.1.1, you can now set the client.id prefix on @KafkaListener. Previously, to customize the client ID, you needed a separate consumer factory (and container factory) per listener. The prefix is suffixed with -n to provide unique client IDs when you use concurrency.

Logging Offset Commits

By default, logging of topic offset commits is performed with the DEBUG logging level. Starting with version 2.1.2, a new property in ContainerProperties called commitLogLevel lets you specify the log level for these messages. See [kafka-container] for more information.

Default @KafkaHandler

Starting with version 2.1.3, you can designate one of the @KafkaHandler annotations on a class-level @KafkaListener as the default. See [class-level-kafkalistener] for more information.

ReplyingKafkaTemplate

Starting with version 2.1.3, a subclass of KafkaTemplate is provided to support request/reply semantics. See [replying-template] for more information.

ChainedKafkaTransactionManager

Version 2.1.3 introduced the ChainedKafkaTransactionManager. See [chained-transaction-manager] for more information.

Migration Guide from 2.0

See the 2.0 to 2.1 Migration guide.

Changes Between 1.3 and 2.0

Spring Framework and Java Versions

The Spring for Apache Kafka project now requires Spring Framework 5.0 and Java 8.

@KafkaListener Changes

You can now annotate @KafkaListener methods (and classes and @KafkaHandler methods) with @SendTo. If the method returns a result, it is forwarded to the specified topic. See [annotation-send-to] for more information.

Message Listeners

Message listeners can now be aware of the Consumer object. See Message Listeners for more information.

Using ConsumerAwareRebalanceListener

Rebalance listeners can now access the Consumer object during rebalance notifications. See [rebalance-listeners] for more information.

Changes Between 1.2 and 1.3

Support for Transactions

The 0.11.0.0 client library added support for transactions. The KafkaTransactionManager and other support for transactions have been added. See [transactions] for more information.

Support for Headers

The 0.11.0.0 client library added support for message headers. These can now be mapped to and from spring-messaging MessageHeaders. See [headers] for more information.

Creating Topics

The 0.11.0.0 client library provides an AdminClient, which you can use to create topics. The KafkaAdmin uses this client to automatically add topics defined as @Bean instances.

Support for Kafka Timestamps

KafkaTemplate now supports an API to add records with timestamps. New KafkaHeaders have been introduced regarding timestamp support. Also, new KafkaConditions.timestamp() and KafkaMatchers.hasTimestamp() testing utilities have been added. See [kafka-template], [kafka-listener-annotation], and [testing] for more details.

@KafkaListener Changes

You can now configure a KafkaListenerErrorHandler to handle exceptions. See [annotation-error-handling] for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

@EmbeddedKafka Annotation

For convenience, a test class-level @EmbeddedKafka annotation is provided, to register KafkaEmbedded as a bean. See [testing] for more information.

Kerberos Configuration

Support for configuring Kerberos is now provided. See [kerberos] for more information.

Changes Between 1.1 and 1.2

This version uses the 0.10.2.x client.

Changes Between 1.0 and 1.1

Kafka Client

This version uses the Apache Kafka 0.10.x.x client.

Batch Listeners

Listeners can be configured to receive the entire batch of messages returned by the consumer.poll() operation, rather than one at a time.

Null Payloads

Null payloads are used to “delete” keys when you use log compaction.

Initial Offset

When explicitly assigning partitions, you can now configure the initial offset relative to the current position for the consumer group, rather than absolute or relative to the current end.

Seek

You can now seek the position of each topic or partition. You can use this to set the initial position during initialization when group management is in use and Kafka assigns the partitions. You can also seek when an idle container is detected or at any arbitrary point in your application’s execution. See Seek for more information.