Class KafkaConsumerBackoffManager
- java.lang.Object
-
- org.springframework.kafka.listener.KafkaConsumerBackoffManager
-
- All Implemented Interfaces:
java.util.EventListener,org.springframework.context.ApplicationListener<ListenerContainerPartitionIdleEvent>
public class KafkaConsumerBackoffManager extends java.lang.Object implements org.springframework.context.ApplicationListener<ListenerContainerPartitionIdleEvent>
A manager that backs off consumption for a given topic if the timestamp provided is not due. Use withSeekToCurrentErrorHandlerto guarantee that the message is read again after partition consumption is resumed (or seek it manually by other means). It's also necessary to set aContainerProperties.setIdlePartitionEventInterval(Long)so the Manager can resume the partition consumption. Note that when a record backs off the partition consumption gets paused for approximately that amount of time, so you must have a fixed backoff value per partition in order to make sure no record waits more than it should.- Since:
- 2.7
- See Also:
SeekToCurrentErrorHandler
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classKafkaConsumerBackoffManager.ContextProvides the state that will be used for backing off.
-
Field Summary
Fields Modifier and Type Field Description static java.lang.StringINTERNAL_BACKOFF_CLOCK_BEAN_NAMEInternal Back Off Clock Bean Name.
-
Constructor Summary
Constructors Constructor Description KafkaConsumerBackoffManager(ListenerContainerRegistry registry, java.time.Clock clock, org.springframework.core.task.TaskExecutor taskExecutor, org.springframework.retry.backoff.Sleeper sleeper)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected voidaddBackoff(KafkaConsumerBackoffManager.Context context, org.apache.kafka.common.TopicPartition topicPartition)KafkaConsumerBackoffManager.ContextcreateContext(long dueTimestamp, java.lang.String listenerId, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.clients.consumer.Consumer<?,?> consumerForTimingCorrection)protected KafkaConsumerBackoffManager.ContextgetBackOffContext(org.apache.kafka.common.TopicPartition topicPartition)voidmaybeBackoff(KafkaConsumerBackoffManager.Context context)Backs off if the current time is before the dueTimestamp provided in theKafkaConsumerBackoffManager.Contextobject.voidonApplicationEvent(ListenerContainerPartitionIdleEvent partitionIdleEvent)protected voidremoveBackoff(org.apache.kafka.common.TopicPartition topicPartition)
-
-
-
Field Detail
-
INTERNAL_BACKOFF_CLOCK_BEAN_NAME
public static final java.lang.String INTERNAL_BACKOFF_CLOCK_BEAN_NAME
Internal Back Off Clock Bean Name.- See Also:
- Constant Field Values
-
-
Constructor Detail
-
KafkaConsumerBackoffManager
public KafkaConsumerBackoffManager(ListenerContainerRegistry registry, @Qualifier("internalBackOffClock") java.time.Clock clock, org.springframework.core.task.TaskExecutor taskExecutor, org.springframework.retry.backoff.Sleeper sleeper)
-
-
Method Detail
-
maybeBackoff
public void maybeBackoff(KafkaConsumerBackoffManager.Context context)
Backs off if the current time is before the dueTimestamp provided in theKafkaConsumerBackoffManager.Contextobject.- Parameters:
context- the back off context for this execution.
-
onApplicationEvent
public void onApplicationEvent(ListenerContainerPartitionIdleEvent partitionIdleEvent)
- Specified by:
onApplicationEventin interfaceorg.springframework.context.ApplicationListener<ListenerContainerPartitionIdleEvent>
-
addBackoff
protected void addBackoff(KafkaConsumerBackoffManager.Context context, org.apache.kafka.common.TopicPartition topicPartition)
-
getBackOffContext
@Nullable protected KafkaConsumerBackoffManager.Context getBackOffContext(org.apache.kafka.common.TopicPartition topicPartition)
-
removeBackoff
protected void removeBackoff(org.apache.kafka.common.TopicPartition topicPartition)
-
createContext
public KafkaConsumerBackoffManager.Context createContext(long dueTimestamp, java.lang.String listenerId, org.apache.kafka.common.TopicPartition topicPartition, @Nullable org.apache.kafka.clients.consumer.Consumer<?,?> consumerForTimingCorrection)
-
-