1. Non-Blocking Retries
| This is an experimental feature and the usual rule of no breaking API changes does not apply to this feature until the experimental designation is removed. Users are encouraged to try out the feature and provide feedback via GitHub Issues or GitHub discussions. |
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
Since 2.7 Spring for Apache Kafka offers support for that via the @RetryableTopic annotation and RetryTopicConfiguration class to simplify that bootstrapping.
1.1. How The Pattern Works
If message processing fails, the message is forwarded to a retry topic with a back off timestamp. The retry topic consumer then checks the timestamp and if it’s not due it pauses the consumption for that topic’s partition. When it is due the partition consumption is resumed, and the message is consumed again. If the message processing fails again the message will be forwarded to the next retry topic, and the pattern is repeated until a successful processing occurs, or the attempts are exhausted, and the message is sent to the Dead Letter Topic (if configured).
To illustrate, if you have a "main-topic" topic, and want to setup non-blocking retry with an exponential backoff of 1000ms with a multiplier of 2 and 4 max attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics and configure the respective consumers. The framework also takes care of creating the topics and setting up and configuring the listeners.
| By using this strategy you lose Kafka’s ordering guarantees for that topic. |
At this time this functionality only supports ConcurrentKafkaListenerContainerFactory factories and doesn’t support class level @KafkaListener annotations
|
1.2. Configuration
1.2.1. Using the @RetryableTopic annotation
To configure the retry topic and dlt for a @KafkaListener annotated method, you just have to add the @RetryableTopic annotation to it and Spring Kafka will bootstrap all the necessary topics and consumers with the default configurations.
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
You can specify a method in the same class to process the dlt messages by annotating it with the @DltHandler annotation.
If no DltHandler method is provided a default consumer is created which only logs the consumption.
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
If you don’t specify a kafkaTemplate name a bean with name retryTopicDefaultKafkaTemplate will be looked up.
If no bean is found an exception is thrown.
|
1.2.2. Using RetryTopicConfiguration beans
You can also configure the non-blocking retry support by creating RetryTopicConfiguration beans in a @Configuration annotated class.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfiguration
.builder()
.create(template);
}
This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with '@KafkaListener' using the default configurations. The KafkaTemplate instance is required for message forwarding.
To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one RetryTopicConfiguration bean can be provided.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfiguration
.builder()
.fixedBackoff(3000)
.maxAttempts(5)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfiguration
.builder()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
The retry topics' and dlt’s consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the groupId parameter of the @KafkaListener annotation with the topic’s suffix. If you don’t provide any they’ll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
|
1.3. Features
Most of the features are available both for the @RetryableTopic annotation and the RetryTopicConfiguration beans.
1.3.1. BackOff Configuration
The BackOff configuration relies on the BackOffPolicy interface from the Spring Retry project.
It includes:
-
Fixed Back Off
-
Exponential Back Off
-
Random Exponential Back Off
-
Uniform Random Back Off
-
No Back Off
-
Custom Back Off
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfiguration
.builder()
.fixedBackoff(3000)
.maxAttempts(4)
.build
You can also provide a custom implementation of Spring Retry’s SleepingBackOffPolicy:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfiguration
.builder()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.build
| The default backoff policy is FixedBackOffPolicy with a maximum of 3 attempts and 1000ms intervals. |
| The first attempt counts against the maxAttempts, so if you provide a maxAttempts value of 4 there’ll be the original attempt plus 3 retries. |
1.3.2. Single Topic Fixed Delay Retries
If you’re using fixed delay policies such as FixedBackOffPolicy or NoBackOffPolicy you can use a single topic to accomplish the non-blocking retries.
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfiguration
.builder()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.build
| The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, … |
1.3.3. Global timeout
You can set the global timeout for the retrying process. If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.
@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfiguration
.builder()
.fixedBackoff(2000)
.timeoutAfter(5000)
.build
| The default is having no timeout set, which can also be achieved by providing -1 as the timout value. |
1.3.4. Exception Classifier
You can specify which exceptions you want to retry on and which not to. You can also set it to traverse the causes to lookup nested exceptions.
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfiguration
.builder()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
| The default behavior is retrying on all exceptions and not traversing causes. |
1.3.5. Include and Exclude Topics
You can decide which topics will and will not be handled by a RetryTopicConfiguration bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.
@Bean
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfigurer myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.excludeTopic("my-excluded-topic")
.create(template);
}
| The default behavior is to include all topics. |
1.3.6. Topics AutoCreation
Unless otherwise specified the framework will auto create the required topics using NewTopic beans that are consumed by the KafkaAdmin bean.
You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off.
| Note that if you’re not using Spring Boot you’ll have to provide a KafkaAdmin bean in order to use this feature. |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfigurer myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.doNotAutoCreateRetryTopics()
.create(template);
}
| By default the topics are autocreated with one partition and a replication factor of one. |
1.4. Topic Naming
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
Examples:
"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …, "my-topic-dlt"
"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …, "my-topic-myDltSuffix".
1.4.1. Retry Topics and Dlt Suffixes
You can specify the suffixes that will be used by the retry and dlt topics.
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfiguration
.builder()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
| The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively. |
1.4.2. Appending the Topic’s Index or Delay
You can either append the topic’s index or delay values after the suffix.
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfiguration
.builder()
.suffixTopicsWithIndexValues()
.create(template);
}
| The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic’s index. |
1.5. Dlt Strategies
The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails.
1.5.1. Dlt Processing Method
You can specify the method used to process the Dlt for the topic, as well as the behavior if that processing fails.
To do that you can use the @DltHandler annotation in a method of the class with the @RetryableTopic annotation(s).
Note that the same method will be used for all the @RetryableTopic annotated methods within that class.
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
The DLT handler method can also be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(Class, String) method, passing as arguments the class and method name that should process the DLT’s messages. If a bean instance of the provided class is found in the application context that bean is used for Dlt processing, otherwise an instance is created with full dependency injection support.
@Bean
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
| If no DLT handler is provided, the default RetryTopicConfigurer.LoggingDltListenerHandlerMethod is used. |
1.5.2. Dlt Failure Behavior
Should the Dlt processing fail, there are two possible behaviors available: ALWAYS_RETRY_ON_ERROR and FAIL_ON_ERROR.
In the former the message is forwarded back to the dlt topic so it doesn’t block other dlt messages' processing. In the latter the consumer ends the execution without forwarding the message.
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
The default behavior is to ALWAYS_RETRY_ON_ERROR.
|
1.5.3. Configuring No Dlt
The framework also provides the possibility of not configuring a DLT for the topic. In this case after retrials are exhausted the processing simply ends.
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.doNotConfigureDlt()
.create(template);
}
1.6. Specifying a ListenerContainerFactory
By default the RetryTopic configuration will use the provided factory from the @KafkaListener annotation, but you can specify a different one to be used to create the retry topic and dlt listener containers.
| The provided factory will be configured for the retry topic functionality, so you should not use the same factory for both retrying and non-retrying topics. You can however share the same factory between many retry topic configurations. |
For the @RetryableTopic annotation you can provide the factory’s bean name, and using the RetryTopicConfiguration bean you can either provide the bean name or the instance itself.
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template, ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
return RetryTopicConfigurer
.builder()
.listenerFactory(factory)
.create(template);
}
@Bean
public RetryTopicConfigurer myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurer
.builder()
.listenerFactory("my-retry-topic-factory")
.create(template);
}