Class KafkaMessageListenerContainerSpec<K,V>
java.lang.Object
org.springframework.beans.factory.config.AbstractFactoryBean<T>
org.springframework.integration.dsl.IntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
org.springframework.integration.kafka.dsl.KafkaMessageListenerContainerSpec<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
Aware,BeanClassLoaderAware,BeanFactoryAware,DisposableBean,FactoryBean<org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>,InitializingBean,Lifecycle,Phased,SmartLifecycle
public class KafkaMessageListenerContainerSpec<K,V> extends IntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>
A helper class in the Builder pattern style to delegate options to the
ConcurrentMessageListenerContainer.- Since:
- 5.4
- Author:
- Artem Bilan, Gary Russell
-
Field Summary
Fields inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
PARSER, target -
Method Summary
Modifier and Type Method Description KafkaMessageListenerContainerSpec<K,V>ackCount(int count)Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNTorContainerProperties.AckMode.COUNT_TIMEis being used.KafkaMessageListenerContainerSpec<K,V>ackMode(org.springframework.kafka.listener.ContainerProperties.AckMode ackMode)Set the ack mode to use when auto ack (in the configuration properties) is false.KafkaMessageListenerContainerSpec<K,V>ackTime(long millis)Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIMEorContainerProperties.AckMode.COUNT_TIMEis being used.KafkaMessageListenerContainerSpec<K,V>commitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level.KafkaMessageListenerContainerSpec<K,V>concurrency(int concurrency)Specify a concurrency maximum number for theAbstractMessageListenerContainer.KafkaMessageListenerContainerSpec<K,V>consumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)Set the user definedConsumerRebalanceListenerimplementation.KafkaMessageListenerContainerSpec<K,V>consumerTaskExecutor(AsyncListenableTaskExecutor consumerTaskExecutor)Set the executor for threads that poll the consumer.KafkaMessageListenerContainerSpec<K,V>errorHandler(org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler)Specify anErrorHandlerfor theAbstractMessageListenerContainer.KafkaMessageListenerContainerSpec<K,V>groupId(String groupId)Set the group id for this container.KafkaMessageListenerContainerSpec<K,V>id(String id)Configure the component identifier.KafkaMessageListenerContainerSpec<K,V>idleEventInterval(Long idleEventInterval)Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.KafkaMessageListenerContainerSpec<K,V>pollTimeout(long pollTimeout)Set the max time to block in the consumer waiting for records.KafkaMessageListenerContainerSpec<K,V>shutdownTimeout(long shutdownTimeout)Set the timeout for shutting down the container.KafkaMessageListenerContainerSpec<K,V>syncCommits(boolean syncCommits)Set whether or not to call consumer.commitSync() or commitAsync() when the container is responsible for commits.Methods inherited from class org.springframework.integration.dsl.IntegrationComponentSpec
_this, createInstance, destroyInstance, doGet, get, getId, getObjectType, getPhase, isAutoStartup, isRunning, start, stop, stopMethods inherited from class org.springframework.beans.factory.config.AbstractFactoryBean
afterPropertiesSet, destroy, getBeanFactory, getBeanTypeConverter, getEarlySingletonInterfaces, getObject, isSingleton, setBeanClassLoader, setBeanFactory, setSingleton
-
Method Details
-
id
Description copied from class:IntegrationComponentSpecConfigure the component identifier. Used as thebeanNameto register the bean in the application context for this component.- Overrides:
idin classIntegrationComponentSpec<KafkaMessageListenerContainerSpec<K,V>,org.springframework.kafka.listener.ConcurrentMessageListenerContainer<K,V>>- Parameters:
id- the id.- Returns:
- the spec.
-
concurrency
Specify a concurrency maximum number for theAbstractMessageListenerContainer.- Parameters:
concurrency- the concurrency maximum number.- Returns:
- the spec.
- See Also:
ConcurrentMessageListenerContainer.setConcurrency(int)
-
errorHandler
public KafkaMessageListenerContainerSpec<K,V> errorHandler(org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler)Specify anErrorHandlerfor theAbstractMessageListenerContainer.- Parameters:
errorHandler- theErrorHandler.- Returns:
- the spec.
- See Also:
ErrorHandler
-
ackMode
public KafkaMessageListenerContainerSpec<K,V> ackMode(org.springframework.kafka.listener.ContainerProperties.AckMode ackMode)Set the ack mode to use when auto ack (in the configuration properties) is false.- RECORD: Ack after each record has been passed to the listener.
- BATCH: Ack after each batch of records received from the consumer has been passed to the listener
- TIME: Ack after this number of milliseconds; (should be greater than
#setPollTimeout(long) pollTimeout. - COUNT: Ack after at least this number of records have been received
- MANUAL: Listener is responsible for acking - use a
AcknowledgingMessageListener.
- Parameters:
ackMode- theContainerProperties.AckMode; default BATCH.- Returns:
- the spec.
- See Also:
ContainerProperties.AckMode
-
pollTimeout
Set the max time to block in the consumer waiting for records.- Parameters:
pollTimeout- the timeout in ms; default 1000.- Returns:
- the spec.
- See Also:
ConsumerProperties.setPollTimeout(long)
-
ackCount
Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNTorContainerProperties.AckMode.COUNT_TIMEis being used.- Parameters:
count- the count- Returns:
- the spec.
- See Also:
ContainerProperties.setAckCount(int)
-
ackTime
Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIMEorContainerProperties.AckMode.COUNT_TIMEis being used. Should be larger than zero.- Parameters:
millis- the time- Returns:
- the spec.
- See Also:
ContainerProperties.setAckTime(long)
-
consumerTaskExecutor
public KafkaMessageListenerContainerSpec<K,V> consumerTaskExecutor(AsyncListenableTaskExecutor consumerTaskExecutor)Set the executor for threads that poll the consumer.- Parameters:
consumerTaskExecutor- the executor- Returns:
- the spec.
- See Also:
ContainerProperties.setConsumerTaskExecutor(AsyncListenableTaskExecutor)
-
shutdownTimeout
Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to#stop(Runnable)will block for, before returning.- Parameters:
shutdownTimeout- the shutdown timeout.- Returns:
- the spec.
- See Also:
ContainerProperties.setShutdownTimeout(long)
-
consumerRebalanceListener
public KafkaMessageListenerContainerSpec<K,V> consumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)Set the user definedConsumerRebalanceListenerimplementation.- Parameters:
consumerRebalanceListener- theConsumerRebalanceListenerinstance- Returns:
- the spec.
- See Also:
ConsumerProperties.setConsumerRebalanceListener(ConsumerRebalanceListener)
-
commitCallback
public KafkaMessageListenerContainerSpec<K,V> commitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level.- Parameters:
commitCallback- the callback.- Returns:
- the spec.
- See Also:
ConsumerProperties.setCommitCallback(OffsetCommitCallback)
-
syncCommits
Set whether or not to call consumer.commitSync() or commitAsync() when the container is responsible for commits. Default true. See https://github.com/spring-projects/spring-kafka/issues/62 At the time of writing, async commits are not entirely reliable.- Parameters:
syncCommits- true to use commitSync().- Returns:
- the spec.
- See Also:
ConsumerProperties.setSyncCommits(boolean)
-
idleEventInterval
Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.- Parameters:
idleEventInterval- the interval.- Returns:
- the spec.
- See Also:
ContainerProperties.setIdleEventInterval(Long)
-
groupId
Set the group id for this container. Overrides anygroup.idproperty provided by the consumer factory configuration.- Parameters:
groupId- the group id.- Returns:
- the spec.
- See Also:
ConsumerProperties.setGroupId(String)
-