Class AbstractMessageListenerContainer<K,V>
- java.lang.Object
-
- org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
-
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanNameAware,org.springframework.context.ApplicationContextAware,org.springframework.context.ApplicationEventPublisherAware,org.springframework.context.Lifecycle,org.springframework.context.Phased,org.springframework.context.SmartLifecycle,GenericMessageListenerContainer<K,V>,MessageListenerContainer
- Direct Known Subclasses:
ConcurrentMessageListenerContainer,KafkaMessageListenerContainer
public abstract class AbstractMessageListenerContainer<K,V> extends java.lang.Object implements GenericMessageListenerContainer<K,V>, org.springframework.beans.factory.BeanNameAware, org.springframework.context.ApplicationEventPublisherAware, org.springframework.context.ApplicationContextAware
The base implementation for theMessageListenerContainer.- Author:
- Gary Russell, Marius Bogoevici, Artem Bilan, Tomaz Fernandes
-
-
Field Summary
Fields Modifier and Type Field Description protected ConsumerFactory<K,V>consumerFactorystatic intDEFAULT_PHASEThe defaultSmartLifecyclephase for listener containers 2147483547.protected java.lang.ObjectlifecycleMonitorprotected org.springframework.core.log.LogAccessorlogger
-
Constructor Summary
Constructors Modifier Constructor Description protectedAbstractMessageListenerContainer(ConsumerFactory<? super K,? super V> consumerFactory, ContainerProperties containerProperties)Construct an instance with the provided factory and properties.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidcheckGroupId()protected voidcheckTopics()protected org.apache.kafka.clients.consumer.ConsumerRebalanceListenercreateSimpleLoggingConsumerRebalanceListener()Return default implementation ofConsumerRebalanceListenerinstance.protected abstract voiddoStart()protected abstract voiddoStop(java.lang.Runnable callback)AfterRollbackProcessor<? super K,? super V>getAfterRollbackProcessor()Return the currently configuredAfterRollbackProcessor.protected org.springframework.context.ApplicationContextgetApplicationContext()org.springframework.context.ApplicationEventPublishergetApplicationEventPublisher()Get the event publisher.protected BatchInterceptor<K,V>getBatchInterceptor()java.lang.StringgetBeanName()Return the bean name.ContainerPropertiesgetContainerProperties()Return the container properties for this container.GenericErrorHandler<?>getGenericErrorHandler()Get the configured error handler.java.lang.StringgetGroupId()Return thegroup.idproperty for this container whether specifically set on the container or via a consumer property on the consumer factory.java.lang.StringgetListenerId()The 'id' attribute of a@KafkaListeneror the bean name for spring-managed containers.intgetPhase()protected RecordInterceptor<K,V>getRecordInterceptor()booleanisAutoStartup()protected booleanisInterceptBeforeTx()booleanisPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition)Whether or not this topic's partition pause has been requested.protected booleanisPaused()booleanisPauseRequested()Return true ifMessageListenerContainer.pause()has been called; the container might not have actually paused yet.booleanisRunning()protected AbstractMessageListenerContainer<?,?>parentOrThis()Return this or a parent container if this has a parent.voidpause()Pause this container before the next poll().voidpausePartition(org.apache.kafka.common.TopicPartition topicPartition)Pause this partition before the next poll().protected voidpublishContainerStoppedEvent()voidresume()Resume this container, if paused, after the next poll().voidresumePartition(org.apache.kafka.common.TopicPartition topicPartition)Resume this partition, if paused, after the next poll().voidsetAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)Set a processor to perform seeks on unprocessed records after a rollback.voidsetApplicationContext(org.springframework.context.ApplicationContext applicationContext)voidsetApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)voidsetAutoStartup(boolean autoStartup)Set the autoStartup.voidsetBatchErrorHandler(BatchErrorHandler errorHandler)Set the batch error handler to call when the listener throws an exception.voidsetBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)Set an interceptor to be called before calling the record listener.voidsetBeanName(java.lang.String name)voidsetErrorHandler(ErrorHandler errorHandler)Set the error handler to call when the listener throws an exception.voidsetGenericErrorHandler(GenericErrorHandler<?> errorHandler)Set the error handler to call when the listener throws an exception.voidsetInterceptBeforeTx(boolean interceptBeforeTx)When true, invoke the interceptor before the transaction starts.voidsetPhase(int phase)voidsetRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)Set an interceptor to be called before calling the record listener.protected voidsetRunning(boolean running)voidsetTopicCheckTimeout(int topicCheckTimeout)How long to wait forAdmin.describeTopics(Collection)result futures to complete.voidsetupMessageListener(java.lang.Object messageListener)Setup the message listener to use.voidstart()voidstop()voidstop(boolean wait)Stop the container.voidstop(java.lang.Runnable callback)-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.MessageListenerContainer
getAssignedPartitions, getAssignmentsByClientId, isContainerPaused, isPartitionPaused, metrics
-
-
-
-
Field Detail
-
DEFAULT_PHASE
public static final int DEFAULT_PHASE
The defaultSmartLifecyclephase for listener containers 2147483547.- See Also:
- Constant Field Values
-
logger
protected final org.springframework.core.log.LogAccessor logger
-
consumerFactory
protected final ConsumerFactory<K,V> consumerFactory
-
lifecycleMonitor
protected final java.lang.Object lifecycleMonitor
-
-
Constructor Detail
-
AbstractMessageListenerContainer
protected AbstractMessageListenerContainer(ConsumerFactory<? super K,? super V> consumerFactory, ContainerProperties containerProperties)
Construct an instance with the provided factory and properties.- Parameters:
consumerFactory- the factory.containerProperties- the properties.
-
-
Method Detail
-
setApplicationContext
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException- Specified by:
setApplicationContextin interfaceorg.springframework.context.ApplicationContextAware- Throws:
org.springframework.beans.BeansException
-
getApplicationContext
@Nullable protected org.springframework.context.ApplicationContext getApplicationContext()
-
setBeanName
public void setBeanName(java.lang.String name)
- Specified by:
setBeanNamein interfaceorg.springframework.beans.factory.BeanNameAware
-
getBeanName
@Nullable public java.lang.String getBeanName()
Return the bean name.- Returns:
- the bean name.
-
setApplicationEventPublisher
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
- Specified by:
setApplicationEventPublisherin interfaceorg.springframework.context.ApplicationEventPublisherAware
-
getApplicationEventPublisher
@Nullable public org.springframework.context.ApplicationEventPublisher getApplicationEventPublisher()
Get the event publisher.- Returns:
- the publisher
-
setErrorHandler
public void setErrorHandler(ErrorHandler errorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler- the error handler.- Since:
- 2.2
-
setGenericErrorHandler
public void setGenericErrorHandler(GenericErrorHandler<?> errorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler- the error handler.- Since:
- 2.2
-
setBatchErrorHandler
public void setBatchErrorHandler(BatchErrorHandler errorHandler)
Set the batch error handler to call when the listener throws an exception.- Parameters:
errorHandler- the error handler.- Since:
- 2.2
-
getGenericErrorHandler
@Nullable public GenericErrorHandler<?> getGenericErrorHandler()
Get the configured error handler.- Returns:
- the error handler.
- Since:
- 2.2
-
isAutoStartup
public boolean isAutoStartup()
- Specified by:
isAutoStartupin interfaceorg.springframework.context.SmartLifecycle
-
setAutoStartup
public void setAutoStartup(boolean autoStartup)
Description copied from interface:MessageListenerContainerSet the autoStartup.- Specified by:
setAutoStartupin interfaceMessageListenerContainer- Parameters:
autoStartup- the autoStartup to set.- See Also:
SmartLifecycle
-
setRunning
protected void setRunning(boolean running)
-
isRunning
public boolean isRunning()
- Specified by:
isRunningin interfaceorg.springframework.context.Lifecycle
-
isPaused
protected boolean isPaused()
-
isPartitionPauseRequested
public boolean isPartitionPauseRequested(org.apache.kafka.common.TopicPartition topicPartition)
Description copied from interface:MessageListenerContainerWhether or not this topic's partition pause has been requested.- Specified by:
isPartitionPauseRequestedin interfaceMessageListenerContainer- Parameters:
topicPartition- the topic partition to check- Returns:
- true if pause for this TopicPartition has been requested
-
pausePartition
public void pausePartition(org.apache.kafka.common.TopicPartition topicPartition)
Description copied from interface:MessageListenerContainerPause this partition before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausePartitionin interfaceMessageListenerContainer- Parameters:
topicPartition- the topicPartition to pause.
-
resumePartition
public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition)
Description copied from interface:MessageListenerContainerResume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
resumePartitionin interfaceMessageListenerContainer- Parameters:
topicPartition- the topicPartition to resume.
-
isPauseRequested
public boolean isPauseRequested()
Description copied from interface:MessageListenerContainerReturn true ifMessageListenerContainer.pause()has been called; the container might not have actually paused yet.- Specified by:
isPauseRequestedin interfaceMessageListenerContainer- Returns:
- true if pause has been requested.
-
setPhase
public void setPhase(int phase)
-
getPhase
public int getPhase()
- Specified by:
getPhasein interfaceorg.springframework.context.Phased- Specified by:
getPhasein interfaceorg.springframework.context.SmartLifecycle
-
getAfterRollbackProcessor
public AfterRollbackProcessor<? super K,? super V> getAfterRollbackProcessor()
Return the currently configuredAfterRollbackProcessor.- Returns:
- the after rollback processor.
- Since:
- 2.2.14
-
setAfterRollbackProcessor
public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
Set a processor to perform seeks on unprocessed records after a rollback. Default will seek to current position all topics/partitions, including the failed record.- Parameters:
afterRollbackProcessor- the processor.- Since:
- 1.3.5
-
getContainerProperties
public ContainerProperties getContainerProperties()
Description copied from interface:MessageListenerContainerReturn the container properties for this container.- Specified by:
getContainerPropertiesin interfaceMessageListenerContainer- Returns:
- the properties.
-
getGroupId
@Nullable public java.lang.String getGroupId()
Description copied from interface:MessageListenerContainerReturn thegroup.idproperty for this container whether specifically set on the container or via a consumer property on the consumer factory.- Specified by:
getGroupIdin interfaceMessageListenerContainer- Returns:
- the group id.
-
getListenerId
@Nullable public java.lang.String getListenerId()
Description copied from interface:MessageListenerContainerThe 'id' attribute of a@KafkaListeneror the bean name for spring-managed containers.- Specified by:
getListenerIdin interfaceMessageListenerContainer- Returns:
- the id or bean name.
-
setTopicCheckTimeout
public void setTopicCheckTimeout(int topicCheckTimeout)
How long to wait forAdmin.describeTopics(Collection)result futures to complete.- Parameters:
topicCheckTimeout- the timeout in seconds; default 30.- Since:
- 2.3
-
getRecordInterceptor
protected RecordInterceptor<K,V> getRecordInterceptor()
-
setRecordInterceptor
public void setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
Set an interceptor to be called before calling the record listener. Does not apply to batch listeners.- Parameters:
recordInterceptor- the interceptor.- Since:
- 2.2.7
- See Also:
setInterceptBeforeTx(boolean)
-
getBatchInterceptor
protected BatchInterceptor<K,V> getBatchInterceptor()
-
setBatchInterceptor
public void setBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)
Set an interceptor to be called before calling the record listener.- Parameters:
batchInterceptor- the interceptor.- Since:
- 2.6.6
- See Also:
setInterceptBeforeTx(boolean)
-
isInterceptBeforeTx
protected boolean isInterceptBeforeTx()
-
setInterceptBeforeTx
public void setInterceptBeforeTx(boolean interceptBeforeTx)
When true, invoke the interceptor before the transaction starts.- Parameters:
interceptBeforeTx- true to intercept before the transaction.- Since:
- 2.3.4
- See Also:
setRecordInterceptor(RecordInterceptor),setBatchInterceptor(BatchInterceptor)
-
setupMessageListener
public void setupMessageListener(java.lang.Object messageListener)
Description copied from interface:MessageListenerContainerSetup the message listener to use. Throws anIllegalArgumentExceptionif that message listener type is not supported.- Specified by:
setupMessageListenerin interfaceMessageListenerContainer- Parameters:
messageListener- theobjectto wrapped to theMessageListener.
-
start
public final void start()
- Specified by:
startin interfaceorg.springframework.context.Lifecycle
-
checkTopics
protected void checkTopics()
-
checkGroupId
public void checkGroupId()
-
doStart
protected abstract void doStart()
-
stop
public final void stop()
- Specified by:
stopin interfaceorg.springframework.context.Lifecycle
-
stop
public final void stop(boolean wait)
Stop the container.- Parameters:
wait- wait for the listener to terminate.- Since:
- 2.3.8
-
pause
public void pause()
Description copied from interface:MessageListenerContainerPause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausein interfaceMessageListenerContainer- See Also:
KafkaConsumer.pause(Collection)
-
resume
public void resume()
Description copied from interface:MessageListenerContainerResume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.- Specified by:
resumein interfaceMessageListenerContainer- See Also:
KafkaConsumer.resume(Collection)
-
stop
public void stop(java.lang.Runnable callback)
- Specified by:
stopin interfaceorg.springframework.context.SmartLifecycle
-
doStop
protected abstract void doStop(java.lang.Runnable callback)
-
createSimpleLoggingConsumerRebalanceListener
protected final org.apache.kafka.clients.consumer.ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener()
Return default implementation ofConsumerRebalanceListenerinstance.- Returns:
- the
ConsumerRebalanceListenercurrently assigned to this container.
-
publishContainerStoppedEvent
protected void publishContainerStoppedEvent()
-
parentOrThis
protected AbstractMessageListenerContainer<?,?> parentOrThis()
Return this or a parent container if this has a parent.- Returns:
- the parent or this.
- Since:
- 2.2.1
-
-