Class WakingKafkaConsumerTimingAdjuster
java.lang.Object
org.springframework.kafka.listener.WakingKafkaConsumerTimingAdjuster
- All Implemented Interfaces:
KafkaConsumerTimingAdjuster
public class WakingKafkaConsumerTimingAdjuster
extends Object
implements KafkaConsumerTimingAdjuster
Adjusts timing by creating a thread that will
wakeup the consumer from polling, considering that, if consumption is paused,
it will check for consumption resuming in increments of 'pollTimeout'. This works best
if the consumer is handling a single partition.
- Since:
- 2.7
- Author:
- Tomaz Fernandes
- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionWakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor) WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor, org.springframework.retry.backoff.Sleeper sleeper) -
Method Summary
Modifier and TypeMethodDescriptionlongadjustTiming(org.apache.kafka.clients.consumer.Consumer<?, ?> consumerToAdjust, org.apache.kafka.common.TopicPartition topicPartition, long pollTimeout, long timeUntilDue) Adjusts the timing with the provided parameters.voidsetPollTimeoutsForAdjustmentWindow(int pollTimeoutsForAdjustmentWindow) Sets how many pollTimeouts prior to the dueTimeout the adjustment will take place.voidsetTimingAdjustmentThreshold(Duration timingAdjustmentThreshold) Sets the threshold for the timing adjustment to take place.
-
Constructor Details
-
WakingKafkaConsumerTimingAdjuster
public WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor, org.springframework.retry.backoff.Sleeper sleeper) -
WakingKafkaConsumerTimingAdjuster
-
-
Method Details
-
setPollTimeoutsForAdjustmentWindow
public void setPollTimeoutsForAdjustmentWindow(int pollTimeoutsForAdjustmentWindow) Sets how many pollTimeouts prior to the dueTimeout the adjustment will take place. Default is 2.- Parameters:
pollTimeoutsForAdjustmentWindow- the amount of pollTimeouts in the adjustment window.
-
setTimingAdjustmentThreshold
Sets the threshold for the timing adjustment to take place. If the time difference between the probable instant the message will be consumed and the instant it should is lower than this value, no adjustment will be applied. Default is 100ms.- Parameters:
timingAdjustmentThreshold- the threshold to be set.
-
adjustTiming
public long adjustTiming(org.apache.kafka.clients.consumer.Consumer<?, ?> consumerToAdjust, org.apache.kafka.common.TopicPartition topicPartition, long pollTimeout, long timeUntilDue) Adjusts the timing with the provided parameters.- Specified by:
adjustTimingin interfaceKafkaConsumerTimingAdjuster- Parameters:
consumerToAdjust- theConsumerthat will be adjustedtopicPartition- theTopicPartitionthat will be adjustedpollTimeout- the pollConfiguration for the consumer's containertimeUntilDue- the amount of time until the message is due for consumption- Returns:
- the adjusted amount in milliseconds
-