Changes between 2.4 and 2.5
This section covers the changes made from version 2.4 to version 2.5. For changes in earlier version, see [history].
Consumer/Producer Factory Changes
The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. Implementations for native Micrometer metrics are provided. See [factory-listeners] for more information.
You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. See [connecting] for more information.
StreamsBuilderFactoryBean Changes
The factory bean can now invoke a callback whenever a KafkaStreams created or destroyed.
An Implementation for native Micrometer metrics is provided.
See [streams-micrometer] for more information.
Delivery Attempts Header
There is now an option to to add a header which tracks delivery attempts when using certain error handlers and after rollback processors. See [delivery-header] for more information.
@KafkaListener Changes
Default reply headers will now be populated automatically if needed when a @KafkaListener return type is Message<?>.
See [reply-message] for more information.
The KafkaHeaders.RECEIVED_MESSAGE_KEY is no longer populated with a null value when the incoming record has a null key; the header is omitted altogether.
@KafkaListener methods can now specify a ConsumerRecordMetadata parameter instead of using discrete headers for metadata such as topic, partition, etc.
See [consumer-record-metadata] for more information.
Listener Container Changes
The assignmentCommitOption container property is now LATEST_ONLY_NO_TX by default.
See [container-props] for more information.
The subBatchPerPartition container property is now true by default when using transactions.
See [transactions] for more information.
A new RecoveringBatchErrorHandler is now provided.
See [recovering-batch-eh] for more information.
Static group membership is now supported. See [message-listener-container] for more information.
When incremental/cooperative rebalancing is configured, if offsets fail to commit with a non-fatal RebalanceInProgressException, the container will attempt to re-commit the offsets for the partitions that remain assigned to this instance after the rebalance is completed.
The default error handler is now the SeekToCurrentErrorHandler for record listeners and RecoveringBatchErrorHandler for batch listeners.
See [error-handlers] for more information.
You can now control the level at which exceptions intentionally thrown by standard error handlers are logged. See [error-handlers] for more information.
The getAssignmentsByClientId() method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s).
See [container-props] for more information.
You can now suppress logging entire ConsumerRecord s in error, debug logs etc.
See onlyLogRecordMetadata in [container-props].
KafkaTemplate Changes
The KafkaTemplate can now maintain micrometer timers.
See [micrometer] for more information.
The KafkaTemplate can now be configured with ProducerConfig properties to override those in the producer factory.
See [kafka-template] for more information.
A RoutingKafkaTemplate has now been provided.
See [routing-template] for more information.
You can now use KafkaSendCallback instead of ListenerFutureCallback to get a narrower exception, making it easier to extract the failed ProducerRecord.
See [kafka-template] for more information.
Kafka String Serializer/Deserializer
New ToStringSerializer/StringDeserializer s as well as an associated SerDe are now provided.
See [string-serde] for more information.
JsonDeserializer
The JsonDeserializer now has more flexibility to determine the deserialization type.
See [serdes-type-methods] for more information.
Delegating Serializer/Deserializer
The DelegatingSerializer can now handle "standard" types, when the outbound record has no header.
See [delegating-serialization] for more information.
Testing Changes
The KafkaTestUtils.consumerProps() helper record now sets ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest by default.
See [junit] for more information.
Changes between 2.3 and 2.4
Kafka Client Version
This version requires the 2.4.0 kafka-clients or higher and supports the new incremental rebalancing feature.
ConsumerAwareRebalanceListener
Like ConsumerRebalanceListener, this interface now has an additional method onPartitionsLost.
Refer to the Apache Kafka documentation for more information.
Unlike the ConsumerRebalanceListener, The default implementation does not call onPartitionsRevoked.
Instead, the listener container will call that method after it has called onPartitionsLost; you should not, therefore, do the same when implementing ConsumerAwareRebalanceListener.
See the IMPORTANT note at the end of [rebalance-listeners] for more information.
KafkaTemplate
The KafkaTemplate now supports non-transactional publishing alongside transactional.
See [tx-template-mixed] for more information.
AggregatingReplyingKafkaTemplate
The releaseStrategy is now a BiConsumer.
It is now called after a timeout (as well as when records arrive); the second parameter is true in the case of a call after a timeout.
See [aggregating-request-reply] for more information.
Listener Container
The ContainerProperties provides an authorizationExceptionRetryInterval option to let the listener container to retry after any AuthorizationException is thrown by the KafkaConsumer.
See its JavaDocs and [kafka-container] for more information.
@KafkaListener
The @KafkaListener annotation has a new property splitIterables; default true.
When a replying listener returns an Iterable this property controls whether the return result is sent as a single record or a record for each element is sent.
See [annotation-send-to] for more information
Batch listeners can now be configured with a BatchToRecordAdapter; this allows, for example, the batch to be processed in a transaction while the listener gets one record at a time.
With the default implementation, a ConsumerRecordRecoverer can be used to handle errors within the batch, without stopping the processing of the entire batch - this might be useful when using transactions.
See [transactions-batch] for more information.
Kafka Streams
The StreamsBuilderFactoryBean accepts a new property KafkaStreamsInfrastructureCustomizer.
This allows configuration of the builder and/or topology before the stream is created.
See [streams-spring] for more information.
Changes Between 2.2 and 2.3
This section covers the changes made from version 2.2 to version 2.3.
Tips, Tricks and Examples
A new chapter [tips-n-tricks] has been added. Please submit GitHub issues and/or pull requests for additional entries in that chapter.
Configuration Changes
Starting with version 2.3.4, the missingTopicsFatal container property is false by default.
When this is true, the application fails to start if the broker is down; many users were affected by this change; given that Kafka is a high-availability platform, we did not anticipate that starting an application with no active brokers would be a common use case.
Producer and Consumer Factory Changes
The DefaultKafkaProducerFactory can now be configured to create a producer per thread.
You can also provide Supplier<Serializer> instances in the constructor as an alternative to either configured classes (which require no-arg constructors), or constructing with Serializer instances, which are then shared between all Producers.
See [producer-factory] for more information.
The same option is available with Supplier<Deserializer> instances in DefaultKafkaConsumerFactory.
See [kafka-container] for more information.
Listener Container Changes
Previously, error handlers received ListenerExecutionFailedException (with the actual listener exception as the cause) when the listener was invoked using a listener adapter (such as @KafkaListener s).
Exceptions thrown by native GenericMessageListener s were passed to the error handler unchanged.
Now a ListenerExecutionFailedException is always the argument (with the actual listener exception as the cause), which provides access to the container’s group.id property.
Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false.
It now sets it to false automatically unless specifically set in the consumer factory or the container’s consumer property overrides.
The ackOnError property is now false by default.
See [seek-to-current] for more information.
It is now possible to obtain the consumer’s group.id property in the listener method.
See [listener-group-id] for more information.
The container has a new property recordInterceptor allowing records to be inspected or modified before invoking the listener.
A CompositeRecordInterceptor is also provided in case you need to invoke multiple interceptors.
See [message-listener-container] for more information.
The ConsumerSeekAware has new methods allowing you to perform seeks relative to the beginning, end, or current position and to seek to the first offset greater than or equal to a time stamp.
See Seek for more information.
A convenience class AbstractConsumerSeekAware is now provided to simplify seeking.
See Seek for more information.
The ContainerProperties provides an idleBetweenPolls option to let the main loop in the listener container to sleep between KafkaConsumer.poll() calls.
See its JavaDocs and [kafka-container] for more information.
When using AckMode.MANUAL (or MANUAL_IMMEDIATE) you can now cause a redelivery by calling nack on the Acknowledgment.
See [committing-offsets] for more information.
Listener performance can now be monitored using Micrometer Timer s.
See [micrometer] for more information.
The containers now publish additional consumer lifecycle events relating to startup. See [events] for more information.
Transactional batch listeners can now support zombie fencing. See [transactions] for more information.
The listener container factory can now be configured with a ContainerCustomizer to further configure each container after it has been created and configured.
See [container-factory] for more information.
ErrorHandler Changes
The SeekToCurrentErrorHandler now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.
The SeekToCurrentErrorHandler and SeekToCurrentBatchErrorHandler can now be configured to apply a BackOff (thread sleep) between delivery attempts.
Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record.
See [seek-to-current] for more information.
The DeadLetterPublishingRecoverer, when used in conjunction with an ErrorHandlingDeserializer, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized.
Previously, it was null and user code needed to extract the DeserializationException from the message headers.
See [dead-letters] for more information.
TopicBuilder
A new class TopicBuilder is provided for more convenient creation of NewTopic @Bean s for automatic topic provisioning.
See [configuring-topics] for more information.
Kafka Streams Changes
You can now perform additional configuration of the StreamsBuilderFactoryBean created by @EnableKafkaStreams.
See Streams Configuration for more information.
A RecoveringDeserializationExceptionHandler is now provided which allows records with deserialization errors to be recovered.
It can be used in conjunction with a DeadLetterPublishingRecoverer to send these records to a dead-letter topic.
See [streams-deser-recovery] for more information.
The HeaderEnricher transformer has been provided, using SpEL to generate the header values.
See [streams-header-enricher] for more information.
The MessagingTransformer has been provided.
This allows a Kafka streams topology to interact with a spring-messaging component, such as a Spring Integration flow.
See [streams-messaging] and See [Calling a Spring Integration Flow from a KStream] for more information.
JSON Component Changes
Now all the JSON-aware components are configured by default with a Jackson ObjectMapper produced by the JacksonUtils.enhancedObjectMapper().
The JsonDeserializer now provides TypeReference-based constructors for better handling of target generic container types.
Also a JacksonMimeTypeModule has been introduced for serialization of org.springframework.util.MimeType to plain string.
See its JavaDocs and [serdes] for more information.
A ByteArrayJsonMessageConverter has been provided as well as a new super class for all Json converters, JsonMessageConverter.
Also, a StringOrBytesSerializer is now available; it can serialize byte[], Bytes and String values in ProducerRecord s.
See [messaging-message-conversion] for more information.
ReplyingKafkaTemplate
When a reply times out, the future is completed exceptionally with a KafkaReplyTimeoutException instead of a KafkaException.
Also, an overloaded sendAndReceive method is now provided that allows specifying the reply timeout on a per message basis.
AggregatingReplyingKafkaTemplate
Extends the ReplyingKafkaTemplate by aggregating replies from multiple receivers.
See [aggregating-request-reply] for more information.
Transaction Changes
You can now override the producer factory’s transactionIdPrefix on the KafkaTemplate and KafkaTransactionManager.
See [transaction-id-prefix] for more information.
New Delegating Serializer/Deserializer
The framework now provides a delegating Serializer and Deserializer, utilizing a header to enable producing and consuming records with multiple key/value types.
See [delegating-serialization] for more information.
New Retrying Deserializer
The framework now provides a delegating RetryingDeserializer, to retry serialization when transient errors such as network problems might occur.
See [retrying-deserialization] for more information.
Changes Between 2.1 and 2.2
Class and Package Changes
The ContainerProperties class has been moved from org.springframework.kafka.listener.config to org.springframework.kafka.listener.
The AckMode enum has been moved from AbstractMessageListenerContainer to ContainerProperties.
The setBatchErrorHandler() and setErrorHandler() methods have been moved from ContainerProperties to both AbstractMessageListenerContainer and AbstractKafkaListenerContainerFactory.
After Rollback Processing
A new AfterRollbackProcessor strategy is provided.
See [after-rollback] for more information.
ConcurrentKafkaListenerContainerFactory Changes
You can now use the ConcurrentKafkaListenerContainerFactory to create and configure any ConcurrentMessageListenerContainer, not only those for @KafkaListener annotations.
See [container-factory] for more information.
Listener Container Changes
A new container property (missingTopicsFatal) has been added.
See [kafka-container] for more information.
A ConsumerStoppedEvent is now emitted when a consumer stops.
See [thread-safety] for more information.
Batch listeners can optionally receive the complete ConsumerRecords<?, ?> object instead of a List<ConsumerRecord<?, ?>.
See Batch Listeners for more information.
The DefaultAfterRollbackProcessor and SeekToCurrentErrorHandler can now recover (skip) records that keep failing, and, by default, does so after 10 failures.
They can be configured to publish failed records to a dead-letter topic.
Starting with version 2.2.4, the consumer’s group ID can be used while selecting the dead letter topic name.
See [after-rollback], [seek-to-current], and [dead-letters] for more information.
The ConsumerStoppingEvent has been added.
See [events] for more information.
The SeekToCurrentErrorHandler can now be configured to commit the offset of a recovered record when the container is configured with AckMode.MANUAL_IMMEDIATE (since 2.2.4).
See [seek-to-current] for more information.
@KafkaListener Changes
You can now override the concurrency and autoStartup properties of the listener container factory by setting properties on the annotation.
You can now add configuration to determine which headers (if any) are copied to a reply message.
See [kafka-listener-annotation] for more information.
You can now use @KafkaListener as a meta-annotation on your own annotations.
See [kafka-listener-meta] for more information.
It is now easier to configure a Validator for @Payload validation.
See [kafka-validation] for more information.
You can now specify kafka consumer properties directly on the annotation; these will override any properties with the same name defined in the consumer factory (since version 2.2.4). See [annotation-properties] for more information.
Header Mapping Changes
Headers of type MimeType and MediaType are now mapped as simple strings in the RecordHeader value.
Previously, they were mapped as JSON and only MimeType was decoded.
MediaType could not be decoded.
They are now simple strings for interoperability.
Also, the DefaultKafkaHeaderMapper has a new addToStringClasses method, allowing the specification of types that should be mapped by using toString() instead of JSON.
See [headers] for more information.
Embedded Kafka Changes
The KafkaEmbedded class and its KafkaRule interface have been deprecated in favor of the EmbeddedKafkaBroker and its JUnit 4 EmbeddedKafkaRule wrapper.
The @EmbeddedKafka annotation now populates an EmbeddedKafkaBroker bean instead of the deprecated KafkaEmbedded.
This change allows the use of @EmbeddedKafka in JUnit 5 tests.
The @EmbeddedKafka annotation now has the attribute ports to specify the port that populates the EmbeddedKafkaBroker.
See [testing] for more information.
JsonSerializer/Deserializer Enhancements
You can now provide type mapping information by using producer and consumer properties.
New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.
The JsonDeserializer now removes any type information headers by default.
You can now configure the JsonDeserializer to ignore type information headers by using a Kafka property (since 2.2.3).
See [serdes] for more information.
Kafka Streams Changes
The streams configuration bean must now be a KafkaStreamsConfiguration object instead of a StreamsConfig object.
The StreamsBuilderFactoryBean has been moved from package …core to …config.
The KafkaStreamBrancher has been introduced for better end-user experience when conditional branches are built on top of KStream instance.
See [streams-kafka-streams] and [streams-config] for more information.
Transactional ID
When a transaction is started by the listener container, the transactional.id is now the transactionIdPrefix appended with <group.id>.<topic>.<partition>.
This change allows proper fencing of zombies, as described here.
Changes Between 2.0 and 2.1
Kafka Client Version
This version requires the 1.0.0 kafka-clients or higher.
The 1.1.x client is supported natively in version 2.2.
JSON Improvements
The StringJsonMessageConverter and JsonSerializer now add type information in Headers, letting the converter and JsonDeserializer create specific types on reception, based on the message itself rather than a fixed configured type.
See [serdes] for more information.
Container Stopping Error Handlers
Container error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal/ They stop the container. See [annotation-error-handling] for more information.
Pausing and Resuming Containers
The listener containers now have pause() and resume() methods (since version 2.1.3).
See [pause-resume] for more information.
Stateful Retry
Starting with version 2.1.3, you can configure stateful retry. See Stateful Retry for more information.
Client ID
Starting with version 2.1.1, you can now set the client.id prefix on @KafkaListener.
Previously, to customize the client ID, you needed a separate consumer factory (and container factory) per listener.
The prefix is suffixed with -n to provide unique client IDs when you use concurrency.
Logging Offset Commits
By default, logging of topic offset commits is performed with the DEBUG logging level.
Starting with version 2.1.2, a new property in ContainerProperties called commitLogLevel lets you specify the log level for these messages.
See [kafka-container] for more information.
Default @KafkaHandler
Starting with version 2.1.3, you can designate one of the @KafkaHandler annotations on a class-level @KafkaListener as the default.
See [class-level-kafkalistener] for more information.
ReplyingKafkaTemplate
Starting with version 2.1.3, a subclass of KafkaTemplate is provided to support request/reply semantics.
See [replying-template] for more information.
ChainedKafkaTransactionManager
Version 2.1.3 introduced the ChainedKafkaTransactionManager.
See [chained-transaction-manager] for more information.
Migration Guide from 2.0
See the 2.0 to 2.1 Migration guide.
Changes Between 1.3 and 2.0
Spring Framework and Java Versions
The Spring for Apache Kafka project now requires Spring Framework 5.0 and Java 8.
@KafkaListener Changes
You can now annotate @KafkaListener methods (and classes and @KafkaHandler methods) with @SendTo.
If the method returns a result, it is forwarded to the specified topic.
See [annotation-send-to] for more information.
Message Listeners
Message listeners can now be aware of the Consumer object.
See Message Listeners for more information.
Using ConsumerAwareRebalanceListener
Rebalance listeners can now access the Consumer object during rebalance notifications.
See [rebalance-listeners] for more information.
Changes Between 1.2 and 1.3
Support for Transactions
The 0.11.0.0 client library added support for transactions.
The KafkaTransactionManager and other support for transactions have been added.
See [transactions] for more information.
Support for Headers
The 0.11.0.0 client library added support for message headers.
These can now be mapped to and from spring-messaging MessageHeaders.
See [headers] for more information.
Creating Topics
The 0.11.0.0 client library provides an AdminClient, which you can use to create topics.
The KafkaAdmin uses this client to automatically add topics defined as @Bean instances.
Support for Kafka Timestamps
KafkaTemplate now supports an API to add records with timestamps.
New KafkaHeaders have been introduced regarding timestamp support.
Also, new KafkaConditions.timestamp() and KafkaMatchers.hasTimestamp() testing utilities have been added.
See [kafka-template], [kafka-listener-annotation], and [testing] for more details.
@KafkaListener Changes
You can now configure a KafkaListenerErrorHandler to handle exceptions.
See [annotation-error-handling] for more information.
By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present).
Further, you can explicitly configure the groupId on the annotation.
Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners.
To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.
@EmbeddedKafka Annotation
For convenience, a test class-level @EmbeddedKafka annotation is provided, to register KafkaEmbedded as a bean.
See [testing] for more information.
Kerberos Configuration
Support for configuring Kerberos is now provided. See [kerberos] for more information.
Changes Between 1.0 and 1.1
Batch Listeners
Listeners can be configured to receive the entire batch of messages returned by the consumer.poll() operation, rather than one at a time.
Initial Offset
When explicitly assigning partitions, you can now configure the initial offset relative to the current position for the consumer group, rather than absolute or relative to the current end.
Seek
You can now seek the position of each topic or partition. You can use this to set the initial position during initialization when group management is in use and Kafka assigns the partitions. You can also seek when an idle container is detected or at any arbitrary point in your application’s execution. See Seek for more information.