Class AbstractMessageListenerContainer
- java.lang.Object
-
- org.springframework.amqp.rabbit.connection.RabbitAccessor
-
- org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
-
- All Implemented Interfaces:
MessageListenerContainer,Aware,BeanNameAware,DisposableBean,InitializingBean,ApplicationContextAware,ApplicationEventPublisherAware,Lifecycle,Phased,SmartLifecycle
- Direct Known Subclasses:
DirectMessageListenerContainer,SimpleMessageListenerContainer
public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware
- Author:
- Mark Pollack, Mark Fisher, Dave Syer, James Carr, Gary Russell, Alex Panchenko, Johno Crawford, Arnaud Cogoluègnes, Artem Bilan, Mohammad Hewedy, Mat Jaggard
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interfaceAbstractMessageListenerContainer.JavaLangErrorHandlerA handler forErroron the container thread(s).static classAbstractMessageListenerContainer.SharedConnectionNotInitializedExceptionException that indicates that the initial setup of this container's shared Rabbit Connection failed.protected static classAbstractMessageListenerContainer.WrappedTransactionExceptionA runtime exception to wrap aThrowable.
-
Field Summary
Fields Modifier and Type Field Description protected ObjectconsumersMonitorstatic booleanDEFAULT_DEBATCHING_ENABLEDstatic intDEFAULT_PREFETCH_COUNTstatic longDEFAULT_RECOVERY_INTERVALThe default recovery interval: 5000 ms = 5 seconds.static longDEFAULT_SHUTDOWN_TIMEOUT-
Fields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
logger
-
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
-
Constructor Summary
Constructors Constructor Description AbstractMessageListenerContainer()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected voidactualInvokeListener(com.rabbitmq.client.Channel channel, Object data)Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.voidaddAfterReceivePostProcessors(MessagePostProcessor... postprocessors)AddMessagePostProcessors that will be applied after message reception, before invoking theMessageListener.voidaddQueueNames(String... queueNames)Add queue(s) to this container's list of queues.voidaddQueues(Queue... queues)Add queue(s) to this container's list of queues.voidafterPropertiesSet()Delegates tovalidateConfiguration()andinitialize().protected booleancauseChainHasImmediateAcknowledgeAmqpException(Throwable ex)Traverse the cause chain and, if anImmediateAcknowledgeAmqpExceptionis found before anAmqpRejectAndDontRequeueException, return true.protected voidcheckMessageListener(Object listener)Check the given message listener, throwing an exception if it does not correspond to a supported listener type.protected voidcheckMismatchedQueues()protected voidconfigureAdminIfNeeded()protected List<Message>debatch(Message message)voiddestroy()Callsshutdown()when the BeanFactory destroys the container instance.protected abstract voiddoInitialize()Register any invokers within this container.protected voiddoInvokeListener(MessageListener listener, Object data)Invoke the specified listener as Spring Rabbit MessageListener.protected voiddoInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data)Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded.protected voiddoSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)protected abstract voiddoShutdown()Close the registered invokers.protected voiddoStart()Start this container, and notify all invoker tasks.protected voiddoStop()This method is invoked when the container is stopping.protected voidexecuteListener(com.rabbitmq.client.Channel channel, Object data)Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).AcknowledgeModegetAcknowledgeMode()protected Advice[]getAdviceChain()protected Collection<MessagePostProcessor>getAfterReceivePostProcessors()protected AmqpAdmingetAmqpAdmin()protected ApplicationContextgetApplicationContext()protected ApplicationEventPublishergetApplicationEventPublisher()protected BatchingStrategygetBatchingStrategy()protected StringgetBeanName()ConnectionFactorygetConnectionFactory()protected longgetConsumeDelay()Get the consumeDelay - a time to wait before consuming in ms.Map<String,Object>getConsumerArguments()Return the consumer arguments.protected ConsumerTagStrategygetConsumerTagStrategy()Return the consumer tag strategy to use.protected ConditionalExceptionLoggergetExclusiveConsumerExceptionLogger()protected longgetFailedDeclarationRetryInterval()protected longgetIdleEventInterval()protected AbstractMessageListenerContainer.JavaLangErrorHandlergetJavaLangErrorHandler()protected longgetLastReceive()Get the time the last message was received - initialized to container start time.StringgetListenerId()The 'id' attribute of the listener.ObjectgetMessageListener()Get the message listener.protected MessagePropertiesConvertergetMessagePropertiesConverter()intgetPhase()protected intgetPrefetchCount()Return the prefetch count.String[]getQueueNames()protected Set<String>getQueueNamesAsSet()protected Map<String,Queue>getQueueNamesToQueues()Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.protected BackOffgetRecoveryBackOff()protected RoutingConnectionFactorygetRoutingConnectionFactory()Return the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory; null otherwise.protected StringgetRoutingLookupKey()Return the lookup key if the connection factory is aRoutingConnectionFactory; null otherwise.protected longgetShutdownTimeout()protected ExecutorgetTaskExecutor()protected TransactionAttributegetTransactionAttribute()protected PlatformTransactionManagergetTransactionManager()protected voidhandleListenerException(Throwable ex)Handle the given exception that arose during listener execution.voidinitialize()Initialize this container.protected voidinitializeProxy(Object delegate)protected voidinvokeErrorHandler(Throwable ex)Invoke the registered ErrorHandler, if any.protected voidinvokeListener(com.rabbitmq.client.Channel channel, Object data)booleanisActive()protected booleanisAlwaysRequeueWithTxManagerRollback()protected booleanisAsyncReplies()protected booleanisAutoDeclare()booleanisAutoStartup()protected booleanisChannelLocallyTransacted()Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.protected booleanisDeBatchingEnabled()protected booleanisDefaultRequeueRejected()Return the default requeue rejected.protected booleanisExclusive()Return whether the consumers should be exclusive.booleanisExposeListenerChannel()protected booleanisForceCloseChannel()Force close the channel if the consumer threads don't respond to a shutdown.protected booleanisGlobalQos()protected booleanisMismatchedQueuesFatal()protected booleanisMissingQueuesFatal()protected booleanisMissingQueuesFatalSet()protected booleanisNoLocal()Return whether the consumers should be no-local.booleanisPossibleAuthenticationFailureFatal()protected booleanisPossibleAuthenticationFailureFatalSet()booleanisRunning()Determine whether this container is currently running, that is, whether it has been started and not stopped yet.protected booleanisStatefulRetryFatalWithNullMessageId()voidlazyLoad()Do not check for missing or mismatched queues during startup.protected voidprepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception)A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.protected voidpublishConsumerFailedEvent(String reason, boolean fatal, Throwable t)protected voidpublishIdleContainerEvent(long idleTime)protected voidpublishMissingQueueEvent(String queue)protected voidredeclareElementsIfNecessary()UseAmqpAdmin.initialize()to redeclare everything if necessary.booleanremoveAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)Remove the providedMessagePostProcessorfrom theafterReceivePostProcessorslist.booleanremoveQueueNames(String... queueNames)Remove queue(s) from this container's list of queues.booleanremoveQueues(Queue... queues)Remove queue(s) from this container's list of queues.voidsetAcknowledgeMode(AcknowledgeMode acknowledgeMode)Flag controlling the behaviour of the container with respect to message acknowledgement.voidsetAdviceChain(Advice... adviceChain)Public setter for theAdviceto apply to listener executions.voidsetAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)SetMessagePostProcessors that will be applied after message reception, before invoking theMessageListener.voidsetAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)Set to true to always requeue on transaction rollback with an externalTransactionManager.voidsetAmqpAdmin(AmqpAdmin amqpAdmin)Set theAmqpAdmin, used to declare any auto-delete queues, bindings etc when the container is started.voidsetApplicationContext(ApplicationContext applicationContext)voidsetApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)voidsetAutoDeclare(boolean autoDeclare)Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().voidsetAutoStartup(boolean autoStartup)Set whether to automatically start the container after initialization.voidsetBatchingStrategy(BatchingStrategy batchingStrategy)Set a batching strategy to use when de-batching messages.voidsetBeanName(String beanName)voidsetConsumeDelay(long consumeDelay)Set the consumeDelay - a time to wait before consuming in ms.voidsetConsumerArguments(Map<String,Object> args)Set consumer arguments.voidsetConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)Set the implementation ofConsumerTagStrategyto generate consumer tags.voidsetDeBatchingEnabled(boolean deBatchingEnabled)Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false).voidsetDefaultRequeueRejected(boolean defaultRequeueRejected)Set the default behavior when a message is rejected, for example because the listener threw an exception.voidsetErrorHandler(ErrorHandler errorHandler)Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.voidsetErrorHandlerLoggerName(String errorHandlerLoggerName)Set the name (category) of the logger used to log exceptions thrown by the error handler.voidsetExclusive(boolean exclusive)Set to true for an exclusive consumer.voidsetExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)Set aConditionalExceptionLoggerfor logging exclusive consumer failures.voidsetExposeListenerChannel(boolean exposeListenerChannel)Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListeneras well as toRabbitTemplatecalls.voidsetFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)Set the interval between passive queue declaration attempts in milliseconds.voidsetForceCloseChannel(boolean forceCloseChannel)Set to true to force close the channel if the consumer threads don't respond to a shutdown.voidsetGlobalQos(boolean globalQos)Apply prefetchCount to the entire channel.voidsetIdleEventInterval(long idleEventInterval)How often to emitListenerContainerIdleEvents in milliseconds.voidsetjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler)Provide a JavaLangErrorHandler implementation; by default,System.exit(99)is called.voidsetListenerId(String listenerId)Set the listener id.voidsetLookupKeyQualifier(String lookupKeyQualifier)Set a qualifier that will prefix the connection factory lookup key; default none.voidsetMessageListener(MessageListener messageListener)Set theMessageListener.voidsetMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)Set theMessagePropertiesConverterfor this listener container.voidsetMicrometerEnabled(boolean micrometerEnabled)Set to false to disable micrometer listener timers.voidsetMicrometerTags(Map<String,String> tags)Set additional tags for the Micrometer listener timers.voidsetMismatchedQueuesFatal(boolean mismatchedQueuesFatal)Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc).voidsetMissingQueuesFatal(boolean missingQueuesFatal)If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal.voidsetNoLocal(boolean noLocal)Set to true for an no-local consumer.protected voidsetNotRunning()voidsetPhase(int phase)Specify the phase in which this container should be started and stopped.voidsetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)voidsetPrefetchCount(int prefetchCount)Tell the broker how many messages to send to each consumer in a single request.voidsetQueueNames(String... queueName)Set the name of the queue(s) to receive messages from.voidsetQueues(Queue... queues)Set the name of the queue(s) to receive messages from.voidsetRecoveryBackOff(BackOff recoveryBackOff)Specify theBackOfffor interval between recovery attempts.voidsetRecoveryInterval(long recoveryInterval)Specify the interval between recovery attempts, in milliseconds.voidsetShutdownTimeout(long shutdownTimeout)The time to wait for workers in milliseconds after the container is stopped.voidsetStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId)Set whether a message with a null messageId is fatal for the consumer when using stateful retry.voidsetTaskExecutor(Executor taskExecutor)Set a task executor for the container - used to create the consumers not at runtime.voidsetTransactionAttribute(TransactionAttribute transactionAttribute)Set the transaction attribute to use when using an external transaction manager.voidsetTransactionManager(PlatformTransactionManager transactionManager)Set the transaction manager to use.voidsetupMessageListener(MessageListener messageListener)Setup the message listener to use.voidshutdown()Stop the shared Connection, calldoShutdown(), and close this container.voidstart()Start this container.voidstop()Stop this container.protected voidupdateLastReceive()protected voidvalidateConfiguration()Validate the configuration of this container.protected ListenerExecutionFailedExceptionwrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Object data)-
Methods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.amqp.rabbit.listener.MessageListenerContainer
isConsumerBatchEnabled
-
Methods inherited from interface org.springframework.context.SmartLifecycle
stop
-
-
-
-
Field Detail
-
DEFAULT_DEBATCHING_ENABLED
public static final boolean DEFAULT_DEBATCHING_ENABLED
- See Also:
- Constant Field Values
-
DEFAULT_PREFETCH_COUNT
public static final int DEFAULT_PREFETCH_COUNT
- See Also:
- Constant Field Values
-
DEFAULT_RECOVERY_INTERVAL
public static final long DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.- See Also:
- Constant Field Values
-
DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
- See Also:
- Constant Field Values
-
consumersMonitor
protected final Object consumersMonitor
-
-
Method Detail
-
setApplicationEventPublisher
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
- Specified by:
setApplicationEventPublisherin interfaceApplicationEventPublisherAware
-
getApplicationEventPublisher
protected ApplicationEventPublisher getApplicationEventPublisher()
-
setAcknowledgeMode
public final void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to
AcknowledgeMode.MANUALif the listener will send the acknowledgements itself usingChannel.basicAck(long, boolean). Manual acks are consistent with either a transactional or non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a single message then the transaction is probably unnecessary.Set to
AcknowledgeMode.NONEto tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). IfAcknowledgeMode.NONEthen the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).- Parameters:
acknowledgeMode- the acknowledge mode to set. Defaults toAcknowledgeMode.AUTO- See Also:
AcknowledgeMode
-
getAcknowledgeMode
public AcknowledgeMode getAcknowledgeMode()
- Returns:
- the acknowledgeMode
-
setQueueNames
public void setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.- Specified by:
setQueueNamesin interfaceMessageListenerContainer- Parameters:
queueName- the desired queueName(s) (can not benull)
-
setQueues
public final void setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.- Parameters:
queues- the desired queue(s) (can not benull)
-
getQueueNames
public String[] getQueueNames()
- Returns:
- the name of the queues to receive messages from.
-
getQueueNamesToQueues
protected Map<String,Queue> getQueueNamesToQueues()
Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.- Returns:
- the map.
- Since:
- 2.1
-
addQueueNames
public void addQueueNames(String... queueNames)
Add queue(s) to this container's list of queues.- Parameters:
queueNames- The queue(s) to add.
-
addQueues
public void addQueues(Queue... queues)
Add queue(s) to this container's list of queues.- Parameters:
queues- The queue(s) to add.
-
removeQueueNames
public boolean removeQueueNames(String... queueNames)
Remove queue(s) from this container's list of queues.- Parameters:
queueNames- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNamesList.
-
removeQueues
public boolean removeQueues(Queue... queues)
Remove queue(s) from this container's list of queues.- Parameters:
queues- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNamesList.
-
isExposeListenerChannel
public boolean isExposeListenerChannel()
- Returns:
- whether to expose the listener
Channelto a registeredChannelAwareMessageListener.
-
setExposeListenerChannel
public void setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListeneras well as toRabbitTemplatecalls.Default is "true", reusing the listener's
Channel. Turn this off to expose a fresh Rabbit Channel fetched from the same underlying RabbitConnectioninstead.Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplatecalls. So in terms of RabbitTemplate exposure, this setting only affects locally transacted Channels.- Parameters:
exposeListenerChannel- true to expose the channel.- See Also:
ChannelAwareMessageListener
-
setMessageListener
public void setMessageListener(MessageListener messageListener)
Set theMessageListener.- Parameters:
messageListener- the listener.- Since:
- 2.0
-
checkMessageListener
protected void checkMessageListener(Object listener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.Only a Spring
MessageListenerobject will be accepted.- Parameters:
listener- the message listener object to check- Throws:
IllegalArgumentException- if the supplied listener is not a MessageListener- See Also:
MessageListener
-
getMessageListener
@Nullable public Object getMessageListener()
Description copied from interface:MessageListenerContainerGet the message listener.- Specified by:
getMessageListenerin interfaceMessageListenerContainer- Returns:
- The message listener object.
-
setErrorHandler
public void setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default aConditionalRejectingErrorHandlerwith its default list of fatal exceptions will be used.- Parameters:
errorHandler- The error handler.
-
setDeBatchingEnabled
public void setDeBatchingEnabled(boolean deBatchingEnabled)
Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false). Default: true.- Parameters:
deBatchingEnabled- the deBatchingEnabled to set.- See Also:
setBatchingStrategy(BatchingStrategy)
-
isDeBatchingEnabled
protected boolean isDeBatchingEnabled()
-
setAdviceChain
public void setAdviceChain(Advice... adviceChain)
Public setter for theAdviceto apply to listener executions.If a {code #setTransactionManager(PlatformTransactionManager) transactionManager} is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
- Parameters:
adviceChain- the advice chain to set
-
getAdviceChain
protected Advice[] getAdviceChain()
-
setAfterReceivePostProcessors
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors)
SetMessagePostProcessors that will be applied after message reception, before invoking theMessageListener. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder,Orderand finally unordered.- Parameters:
afterReceivePostProcessors- the post processor.- Since:
- 1.4.2
- See Also:
addAfterReceivePostProcessors(MessagePostProcessor...)
-
addAfterReceivePostProcessors
public void addAfterReceivePostProcessors(MessagePostProcessor... postprocessors)
AddMessagePostProcessors that will be applied after message reception, before invoking theMessageListener. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder,Orderand finally unordered.In contrast to
setAfterReceivePostProcessors(MessagePostProcessor...), this method does not override the previously added afterReceivePostProcessors.- Parameters:
postprocessors- the post processor.- Since:
- 2.1.4
-
removeAfterReceivePostProcessor
public boolean removeAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor)
Remove the providedMessagePostProcessorfrom theafterReceivePostProcessorslist.- Parameters:
afterReceivePostProcessor- the MessagePostProcessor to remove.- Returns:
- the boolean if the provided post processor has been removed.
- Since:
- 2.1.4
- See Also:
addAfterReceivePostProcessors(MessagePostProcessor...)
-
setAutoStartup
public void setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.Default is "true"; set this to "false" to allow for manual startup through the
start()method.- Specified by:
setAutoStartupin interfaceMessageListenerContainer- Parameters:
autoStartup- true for auto startup.
-
isAutoStartup
public boolean isAutoStartup()
- Specified by:
isAutoStartupin interfaceSmartLifecycle
-
setPhase
public void setPhase(int phase)
Specify the phase in which this container should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible.- Parameters:
phase- The phase.
-
getPhase
public int getPhase()
- Specified by:
getPhasein interfacePhased- Specified by:
getPhasein interfaceSmartLifecycle- Returns:
- The phase in which this container will be started and stopped.
-
setBeanName
public void setBeanName(String beanName)
- Specified by:
setBeanNamein interfaceBeanNameAware
-
getBeanName
@Nullable protected final String getBeanName()
- Returns:
- The bean name that this listener container has been assigned in its containing bean factory, if any.
-
getApplicationContext
protected final ApplicationContext getApplicationContext()
-
setApplicationContext
public final void setApplicationContext(ApplicationContext applicationContext)
- Specified by:
setApplicationContextin interfaceApplicationContextAware
-
getConnectionFactory
public ConnectionFactory getConnectionFactory()
- Overrides:
getConnectionFactoryin classRabbitAccessor- Returns:
- The ConnectionFactory that this accessor uses for obtaining RabbitMQ
Connections.
-
setLookupKeyQualifier
public void setLookupKeyQualifier(String lookupKeyQualifier)
Set a qualifier that will prefix the connection factory lookup key; default none.- Parameters:
lookupKeyQualifier- the qualifier- Since:
- 1.6.9
- See Also:
getRoutingLookupKey()
-
isForceCloseChannel
protected boolean isForceCloseChannel()
Force close the channel if the consumer threads don't respond to a shutdown.- Returns:
- true to force close.
- Since:
- 1.7.4
-
setForceCloseChannel
public void setForceCloseChannel(boolean forceCloseChannel)
Set to true to force close the channel if the consumer threads don't respond to a shutdown. Default: true (since 2.0).- Parameters:
forceCloseChannel- true to force close.- Since:
- 1.7.4
-
getRoutingLookupKey
@Nullable protected String getRoutingLookupKey()
Return the lookup key if the connection factory is aRoutingConnectionFactory; null otherwise. The routing key is the comma-delimited list of queue names with all spaces removed and bracketed by [...], optionally prefixed by a qualifier, e.g. "foo[...]".- Returns:
- the key or null.
- Since:
- 1.6.9
- See Also:
setLookupKeyQualifier(String)
-
getRoutingConnectionFactory
@Nullable protected RoutingConnectionFactory getRoutingConnectionFactory()
Return the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory; null otherwise.- Returns:
- the
RoutingConnectionFactoryor null. - Since:
- 1.6.9
-
getListenerId
@Nullable public String getListenerId()
The 'id' attribute of the listener.- Returns:
- the id (or the container bean name if no id set).
-
setListenerId
public void setListenerId(String listenerId)
Description copied from interface:MessageListenerContainerSet the listener id.- Specified by:
setListenerIdin interfaceMessageListenerContainer- Parameters:
listenerId- the id.
-
setConsumerTagStrategy
public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
Set the implementation ofConsumerTagStrategyto generate consumer tags. By default, the RabbitMQ server generates consumer tags.- Parameters:
consumerTagStrategy- the consumerTagStrategy to set.- Since:
- 1.4.5
-
getConsumerTagStrategy
@Nullable protected ConsumerTagStrategy getConsumerTagStrategy()
Return the consumer tag strategy to use.- Returns:
- the strategy.
- Since:
- 2.0
-
setConsumerArguments
public void setConsumerArguments(Map<String,Object> args)
Set consumer arguments.- Parameters:
args- the arguments.- Since:
- 1.3
-
getConsumerArguments
public Map<String,Object> getConsumerArguments()
Return the consumer arguments.- Returns:
- the arguments.
- Since:
- 2.0
-
setExclusive
public void setExclusive(boolean exclusive)
Set to true for an exclusive consumer.- Parameters:
exclusive- true for an exclusive consumer.
-
isExclusive
protected boolean isExclusive()
Return whether the consumers should be exclusive.- Returns:
- true for exclusive consumers.
-
setNoLocal
public void setNoLocal(boolean noLocal)
Set to true for an no-local consumer.- Parameters:
noLocal- true for an no-local consumer.
-
isNoLocal
protected boolean isNoLocal()
Return whether the consumers should be no-local.- Returns:
- true for no-local consumers.
-
setDefaultRequeueRejected
public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
Set the default behavior when a message is rejected, for example because the listener threw an exception. When true, messages will be requeued, when false, they will not. For versions of Rabbit that support dead-lettering, the message must not be requeued in order to be sent to the dead letter exchange. Setting to false causes all rejections to not be requeued. When true, the default can be overridden by the listener throwing anAmqpRejectAndDontRequeueException. Default true.- Parameters:
defaultRequeueRejected- true to reject by default.
-
isDefaultRequeueRejected
protected boolean isDefaultRequeueRejected()
Return the default requeue rejected.- Returns:
- the boolean.
- Since:
- 2.0
- See Also:
setDefaultRequeueRejected(boolean)
-
setPrefetchCount
public void setPrefetchCount(int prefetchCount)
Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.- Parameters:
prefetchCount- the prefetch count- See Also:
Channel.basicQos(int, boolean)
-
getPrefetchCount
protected int getPrefetchCount()
Return the prefetch count.- Returns:
- the count.
- Since:
- 2.0
-
setGlobalQos
public void setGlobalQos(boolean globalQos)
Apply prefetchCount to the entire channel.- Parameters:
globalQos- true for a channel-wide prefetch.- Since:
- 2.2.17
- See Also:
Channel.basicQos(int, boolean)
-
isGlobalQos
protected boolean isGlobalQos()
-
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Defaults to 5 seconds.- Parameters:
shutdownTimeout- the shutdown timeout to set
-
getShutdownTimeout
protected long getShutdownTimeout()
-
setIdleEventInterval
public void setIdleEventInterval(long idleEventInterval)
How often to emitListenerContainerIdleEvents in milliseconds.- Parameters:
idleEventInterval- the interval.
-
getIdleEventInterval
protected long getIdleEventInterval()
-
getLastReceive
protected long getLastReceive()
Get the time the last message was received - initialized to container start time.- Returns:
- the time.
-
setTransactionManager
public void setTransactionManager(PlatformTransactionManager transactionManager)
Set the transaction manager to use.- Parameters:
transactionManager- the transaction manager.
-
getTransactionManager
@Nullable protected PlatformTransactionManager getTransactionManager()
-
setTransactionAttribute
public void setTransactionAttribute(TransactionAttribute transactionAttribute)
Set the transaction attribute to use when using an external transaction manager.- Parameters:
transactionAttribute- the transaction attribute to set
-
getTransactionAttribute
protected TransactionAttribute getTransactionAttribute()
-
setTaskExecutor
public void setTaskExecutor(Executor taskExecutor)
Set a task executor for the container - used to create the consumers not at runtime.- Parameters:
taskExecutor- the task executor.
-
getTaskExecutor
protected Executor getTaskExecutor()
-
setRecoveryInterval
public void setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.- Parameters:
recoveryInterval- The recovery interval.
-
setRecoveryBackOff
public void setRecoveryBackOff(BackOff recoveryBackOff)
Specify theBackOfffor interval between recovery attempts. The default is 5000 ms, that is, 5 seconds. With theBackOffyou can supply themaxAttemptsfor recovery before thestop()will be performed.- Parameters:
recoveryBackOff- The BackOff to recover.- Since:
- 1.5
-
getRecoveryBackOff
protected BackOff getRecoveryBackOff()
-
setMessagePropertiesConverter
public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set theMessagePropertiesConverterfor this listener container.- Parameters:
messagePropertiesConverter- The properties converter.
-
getMessagePropertiesConverter
protected MessagePropertiesConverter getMessagePropertiesConverter()
-
setAmqpAdmin
public void setAmqpAdmin(AmqpAdmin amqpAdmin)
Set theAmqpAdmin, used to declare any auto-delete queues, bindings etc when the container is started. Only needed if those queues use conditional declaration (have a 'declared-by' attribute). If not specified, an internal admin will be used which will attempt to declare all elements not having a 'declared-by' attribute.- Parameters:
amqpAdmin- the AmqpAdmin to use- Since:
- 2.1
-
setMissingQueuesFatal
public void setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
- Parameters:
missingQueuesFatal- the missingQueuesFatal to set.- Since:
- 1.3.5
- See Also:
setAutoDeclare(boolean)
-
isMissingQueuesFatal
protected boolean isMissingQueuesFatal()
-
isMissingQueuesFatalSet
protected boolean isMissingQueuesFatalSet()
-
setMismatchedQueuesFatal
public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal)
Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc). Default false.- Parameters:
mismatchedQueuesFatal- true to fail initialization when this condition occurs.- Since:
- 1.6
-
isMismatchedQueuesFatal
protected boolean isMismatchedQueuesFatal()
-
setPossibleAuthenticationFailureFatal
public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
-
doSetPossibleAuthenticationFailureFatal
protected final void doSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal)
-
isPossibleAuthenticationFailureFatal
public boolean isPossibleAuthenticationFailureFatal()
-
isPossibleAuthenticationFailureFatalSet
protected boolean isPossibleAuthenticationFailureFatalSet()
-
isAsyncReplies
protected boolean isAsyncReplies()
-
setAutoDeclare
public void setAutoDeclare(boolean autoDeclare)
Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().- Parameters:
autoDeclare- the boolean flag to indicate an declaration operation.- Since:
- 1.4
- See Also:
redeclareElementsIfNecessary()
-
isAutoDeclare
protected boolean isAutoDeclare()
-
setFailedDeclarationRetryInterval
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.- Parameters:
failedDeclarationRetryInterval- the interval, default 5000.- Since:
- 1.3.9
-
getFailedDeclarationRetryInterval
protected long getFailedDeclarationRetryInterval()
-
isStatefulRetryFatalWithNullMessageId
protected boolean isStatefulRetryFatalWithNullMessageId()
-
setStatefulRetryFatalWithNullMessageId
public void setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId)
Set whether a message with a null messageId is fatal for the consumer when using stateful retry. When false, instead of stopping the consumer, the message is rejected and not requeued - it will be discarded or routed to the dead letter queue, if so configured. Default true.- Parameters:
statefulRetryFatalWithNullMessageId- true for fatal.- Since:
- 2.0
-
setExclusiveConsumerExceptionLogger
public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger)
Set aConditionalExceptionLoggerfor logging exclusive consumer failures. The default is to log such failures at WARN level.- Parameters:
exclusiveConsumerExceptionLogger- the conditional exception logger.- Since:
- 1.5
-
getExclusiveConsumerExceptionLogger
protected ConditionalExceptionLogger getExclusiveConsumerExceptionLogger()
-
setAlwaysRequeueWithTxManagerRollback
public void setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback)
Set to true to always requeue on transaction rollback with an externalTransactionManager. With earlier releases, when a transaction manager was configured, a transaction rollback always requeued the message. This was inconsistent with local transactions where the normaldefaultRequeueRejectedandAmqpRejectAndDontRequeueExceptionlogic was honored to determine whether the message was requeued. RabbitMQ does not consider the message delivery to be part of the transaction. This boolean was introduced in 1.7.1, set to true by default, to be consistent with previous behavior. Starting with version 2.0, it is false by default.- Parameters:
alwaysRequeueWithTxManagerRollback- true to always requeue on rollback.- Since:
- 1.7.1.
-
isAlwaysRequeueWithTxManagerRollback
protected boolean isAlwaysRequeueWithTxManagerRollback()
-
setErrorHandlerLoggerName
public void setErrorHandlerLoggerName(String errorHandlerLoggerName)
Set the name (category) of the logger used to log exceptions thrown by the error handler. It defaults to the container's logger but can be overridden if you want it to log at a different level to the container. Such exceptions are logged at the ERROR level.- Parameters:
errorHandlerLoggerName- the logger name.- Since:
- 2.0.8
-
setBatchingStrategy
public void setBatchingStrategy(BatchingStrategy batchingStrategy)
Set a batching strategy to use when de-batching messages. Default isSimpleBatchingStrategy.- Parameters:
batchingStrategy- the strategy.- Since:
- 2.2
- See Also:
setDeBatchingEnabled(boolean)
-
getBatchingStrategy
protected BatchingStrategy getBatchingStrategy()
-
getAfterReceivePostProcessors
protected Collection<MessagePostProcessor> getAfterReceivePostProcessors()
-
setMicrometerTags
public void setMicrometerTags(Map<String,String> tags)
Set additional tags for the Micrometer listener timers.- Parameters:
tags- the tags.- Since:
- 2.2
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable micrometer listener timers.- Parameters:
micrometerEnabled- false to disable.- Since:
- 2.2
-
getConsumeDelay
protected long getConsumeDelay()
Get the consumeDelay - a time to wait before consuming in ms.- Returns:
- the consume delay.
- Since:
- 2.3
-
setConsumeDelay
public void setConsumeDelay(long consumeDelay)
Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin withconcurrency > 1, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.- Parameters:
consumeDelay- the consume delay.- Since:
- 2.3
-
getJavaLangErrorHandler
protected AbstractMessageListenerContainer.JavaLangErrorHandler getJavaLangErrorHandler()
-
setjavaLangErrorHandler
public void setjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler)
Provide a JavaLangErrorHandler implementation; by default,System.exit(99)is called.- Parameters:
javaLangErrorHandler- the handler.- Since:
- 2.2.12
-
afterPropertiesSet
public void afterPropertiesSet()
Delegates tovalidateConfiguration()andinitialize().- Specified by:
afterPropertiesSetin interfaceInitializingBean- Specified by:
afterPropertiesSetin interfaceMessageListenerContainer- Overrides:
afterPropertiesSetin classRabbitAccessor
-
setupMessageListener
public void setupMessageListener(MessageListener messageListener)
Description copied from interface:MessageListenerContainerSetup the message listener to use. Throws anIllegalArgumentExceptionif that message listener type is not supported.- Specified by:
setupMessageListenerin interfaceMessageListenerContainer- Parameters:
messageListener- theobjectto wrapped to theMessageListener.
-
validateConfiguration
protected void validateConfiguration()
Validate the configuration of this container.The default implementation is empty. To be overridden in subclasses.
-
initializeProxy
protected void initializeProxy(Object delegate)
-
destroy
public void destroy()
Callsshutdown()when the BeanFactory destroys the container instance.- Specified by:
destroyin interfaceDisposableBean- See Also:
shutdown()
-
initialize
public void initialize()
Initialize this container.Creates a Rabbit Connection and calls
doInitialize().
-
shutdown
public void shutdown()
Stop the shared Connection, calldoShutdown(), and close this container.
-
setNotRunning
protected void setNotRunning()
-
doInitialize
protected abstract void doInitialize()
Register any invokers within this container.Subclasses need to implement this method for their specific invoker management process.
-
doShutdown
protected abstract void doShutdown()
Close the registered invokers.Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
- See Also:
shutdown()
-
isActive
public final boolean isActive()
- Returns:
- Whether this container is currently active, that is, whether it has been set up but not shut down yet.
-
start
public void start()
Start this container.
-
doStart
protected void doStart()
Start this container, and notify all invoker tasks.
-
stop
public void stop()
Stop this container.
-
doStop
protected void doStop()
This method is invoked when the container is stopping.
-
isRunning
public final boolean isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
-
invokeErrorHandler
protected void invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any. Log at error level otherwise. The default error handler is aConditionalRejectingErrorHandlerwith the defaultFatalExceptionStrategyimplementation.- Parameters:
ex- the uncaught error that arose during Rabbit processing.- See Also:
setErrorHandler(org.springframework.util.ErrorHandler)
-
executeListener
protected void executeListener(com.rabbitmq.client.Channel channel, Object data)Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).- Parameters:
channel- the Rabbit Channel to operate ondata- the received Rabbit Message- See Also:
invokeListener(com.rabbitmq.client.Channel, java.lang.Object),handleListenerException(java.lang.Throwable)
-
invokeListener
protected void invokeListener(com.rabbitmq.client.Channel channel, Object data)
-
actualInvokeListener
protected void actualInvokeListener(com.rabbitmq.client.Channel channel, Object data)Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.- Parameters:
channel- the Rabbit Channel to operate ondata- the received Rabbit Message or List of Message.- See Also:
setMessageListener(MessageListener)
-
doInvokeListener
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data)
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded. An exception thrown from the listener will be wrapped in aListenerExecutionFailedException.- Parameters:
listener- the Spring ChannelAwareMessageListener to invokechannel- the Rabbit Channel to operate ondata- the received Rabbit Message or List of Message.- See Also:
ChannelAwareMessageListener,setExposeListenerChannel(boolean)
-
doInvokeListener
protected void doInvokeListener(MessageListener listener, Object data)
Invoke the specified listener as Spring Rabbit MessageListener.Default implementation performs a plain invocation of the
onMessagemethod.Exception thrown from listener will be wrapped to
ListenerExecutionFailedException.- Parameters:
listener- the Rabbit MessageListener to invokedata- the received Rabbit Message or List of Message.- See Also:
MessageListener.onMessage(org.springframework.amqp.core.Message)
-
isChannelLocallyTransacted
protected boolean isChannelLocallyTransacted()
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
- Returns:
- whether the given Channel is locally transacted
- See Also:
RabbitAccessor.isChannelTransacted()
-
handleListenerException
protected void handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
- Parameters:
ex- the exception to handle
-
wrapToListenerExecutionFailedExceptionIfNeeded
protected ListenerExecutionFailedException wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Object data)
- Parameters:
e- The Exception.data- The failed message.- Returns:
- If 'e' is of type
ListenerExecutionFailedException- return 'e' as it is, otherwise wrap it toListenerExecutionFailedExceptionand return.
-
publishConsumerFailedEvent
protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullable Throwable t)
-
publishMissingQueueEvent
protected void publishMissingQueueEvent(String queue)
-
publishIdleContainerEvent
protected final void publishIdleContainerEvent(long idleTime)
-
updateLastReceive
protected void updateLastReceive()
-
configureAdminIfNeeded
protected void configureAdminIfNeeded()
-
checkMismatchedQueues
protected void checkMismatchedQueues()
-
lazyLoad
public void lazyLoad()
Description copied from interface:MessageListenerContainerDo not check for missing or mismatched queues during startup. Used for lazily loaded message listener containers to avoid a deadlock when starting such containers. Applications lazily loading containers should verify the queue configuration before loading the container bean.- Specified by:
lazyLoadin interfaceMessageListenerContainer
-
redeclareElementsIfNecessary
protected void redeclareElementsIfNecessary()
UseAmqpAdmin.initialize()to redeclare everything if necessary. Since auto deletion of a queue can cause upstream elements (bindings, exchanges) to be deleted too, everything needs to be redeclared if a queue is missing. Declaration is idempotent so, aside from some network chatter, there is no issue, and we only will do it if we detect our queue is gone.In general it makes sense only for the 'auto-delete' or 'expired' queues, but with the server TTL policy we don't have ability to determine 'expiration' option for the queue.
Starting with version 1.6, if
mismatchedQueuesFatalis true, the declarations are always attempted during restart so the listener will fail with a fatal error if mismatches occur.
-
causeChainHasImmediateAcknowledgeAmqpException
protected boolean causeChainHasImmediateAcknowledgeAmqpException(Throwable ex)
Traverse the cause chain and, if anImmediateAcknowledgeAmqpExceptionis found before anAmqpRejectAndDontRequeueException, return true. AnErrorwill take precedence.- Parameters:
ex- the exception- Returns:
- true if we should ack immediately.
- Since:
- 1.6.6
-
prepareHolderForRollback
protected void prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception)
A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.TransactionDefinition.PROPAGATION_NONE). In that case the delivery tags will have been processed manually.- Parameters:
resourceHolder- the bound resource holder (if a transaction is active).exception- the exception.
-
-