Class AbstractMessageListenerContainer
- All Implemented Interfaces:
MessageListenerContainer,Aware,BeanNameAware,DisposableBean,InitializingBean,ApplicationContextAware,ApplicationEventPublisherAware,Lifecycle,Phased,SmartLifecycle
- Direct Known Subclasses:
DirectMessageListenerContainer,SimpleMessageListenerContainer
- Author:
- Mark Pollack, Mark Fisher, Dave Syer, James Carr, Gary Russell, Alex Panchenko, Johno Crawford, Arnaud Cogoluègnes, Artem Bilan, Mohammad Hewedy, Mat Jaggard
-
Nested Class Summary
Nested Classes -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final Objectstatic final booleanstatic final intstatic final longThe default recovery interval: 5000 ms = 5 seconds.static final longFields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
loggerFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected voidactualInvokeListener(com.rabbitmq.client.Channel channel, Object data) Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.voidaddAfterReceivePostProcessors(MessagePostProcessor... postprocessors) AddMessagePostProcessors that will be applied after message reception, before invoking theMessageListener.voidaddQueueNames(String... queueNames) Add queue(s) to this container's list of queues.voidAdd queue(s) to this container's list of queues.voidDelegates tovalidateConfiguration()andinitialize().protected booleanTraverse the cause chain and, if anImmediateAcknowledgeAmqpExceptionis found before anAmqpRejectAndDontRequeueException, return true.protected voidcheckMessageListener(Object listener) Check the given message listener, throwing an exception if it does not correspond to a supported listener type.protected voidprotected voidvoiddestroy()Callsshutdown()when the BeanFactory destroys the container instance.protected abstract voidRegister any invokers within this container.protected voiddoInvokeListener(MessageListener listener, Object data) Invoke the specified listener as Spring Rabbit MessageListener.protected voiddoInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data) Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded.protected final voiddoSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) protected abstract voidClose the registered invokers.protected voiddoStart()Start this container, and notify all invoker tasks.protected voiddoStop()This method is invoked when the container is stopping.protected voidexecuteListener(com.rabbitmq.client.Channel channel, Object data) Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).protected Advice[]protected Collection<MessagePostProcessor>protected AmqpAdminprotected final ApplicationContextprotected ApplicationEventPublisherprotected BatchingStrategyprotected final Stringprotected longGet the consumeDelay - a time to wait before consuming in ms.Return the consumer arguments.protected ConsumerTagStrategyReturn the consumer tag strategy to use.protected ConditionalExceptionLoggerprotected longprotected longprotected longGet the time the last message was received - initialized to container start time.The 'id' attribute of the listener.Get the message listener.protected MessagePropertiesConverterintgetPhase()protected intReturn the prefetch count.String[]Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.protected BackOffprotected RoutingConnectionFactoryReturn the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory; null otherwise.protected StringReturn the lookup key if the connection factory is aRoutingConnectionFactory; null otherwise.protected longprotected Executorprotected TransactionAttributeprotected PlatformTransactionManagerprotected voidHandle the given exception that arose during listener execution.voidInitialize this container.protected voidinitializeProxy(Object delegate) protected voidInvoke the registered ErrorHandler, if any.protected voidinvokeListener(com.rabbitmq.client.Channel channel, Object data) final booleanisActive()protected booleanprotected booleanprotected booleanbooleanprotected booleanCheck whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.protected booleanprotected booleanReturn the default requeue rejected.protected booleanReturn whether the consumers should be exclusive.booleanprotected booleanForce close the channel if the consumer threads don't respond to a shutdown.protected booleanprotected booleanprotected booleanprotected booleanprotected booleanReturn whether the consumers should be no-local.booleanprotected booleanfinal booleanDetermine whether this container is currently running, that is, whether it has been started and not stopped yet.protected booleanvoidlazyLoad()Do not check for missing or mismatched queues during startup.protected voidprepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception) A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.protected voidpublishConsumerFailedEvent(String reason, boolean fatal, Throwable t) protected final voidpublishIdleContainerEvent(long idleTime) protected voidpublishMissingQueueEvent(String queue) protected voidUseAmqpAdmin.initialize()to redeclare everything if necessary.booleanremoveAfterReceivePostProcessor(MessagePostProcessor afterReceivePostProcessor) Remove the providedMessagePostProcessorfrom theafterReceivePostProcessorslist.booleanremoveQueueNames(String... queueNames) Remove queue(s) from this container's list of queues.booleanremoveQueues(Queue... queues) Remove queue(s) from this container's list of queues.final voidsetAcknowledgeMode(AcknowledgeMode acknowledgeMode) Flag controlling the behaviour of the container with respect to message acknowledgement.voidsetAdviceChain(Advice... adviceChain) Public setter for theAdviceto apply to listener executions.voidsetAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) SetMessagePostProcessors that will be applied after message reception, before invoking theMessageListener.voidsetAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback) Set to true to always requeue on transaction rollback with an externalTransactionManager.voidsetAmqpAdmin(AmqpAdmin amqpAdmin) Set theAmqpAdmin, used to declare any auto-delete queues, bindings etc when the container is started.final voidsetApplicationContext(ApplicationContext applicationContext) voidsetApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) voidsetAutoDeclare(boolean autoDeclare) Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().voidsetAutoStartup(boolean autoStartup) Set whether to automatically start the container after initialization.voidsetBatchingStrategy(BatchingStrategy batchingStrategy) Set a batching strategy to use when de-batching messages.voidsetBeanName(String beanName) voidsetConsumeDelay(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms.voidsetConsumerArguments(Map<String, Object> args) Set consumer arguments.voidsetConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy) Set the implementation ofConsumerTagStrategyto generate consumer tags.voidsetDeBatchingEnabled(boolean deBatchingEnabled) Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false).voidsetDefaultRequeueRejected(boolean defaultRequeueRejected) Set the default behavior when a message is rejected, for example because the listener threw an exception.voidsetErrorHandler(ErrorHandler errorHandler) Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.voidsetErrorHandlerLoggerName(String errorHandlerLoggerName) Set the name (category) of the logger used to log exceptions thrown by the error handler.voidsetExclusive(boolean exclusive) Set to true for an exclusive consumer.voidsetExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger) Set aConditionalExceptionLoggerfor logging exclusive consumer failures.voidsetExposeListenerChannel(boolean exposeListenerChannel) Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListeneras well as toRabbitTemplatecalls.voidsetFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.voidsetForceCloseChannel(boolean forceCloseChannel) Set to true to force close the channel if the consumer threads don't respond to a shutdown.voidsetGlobalQos(boolean globalQos) Apply prefetchCount to the entire channel.voidsetIdleEventInterval(long idleEventInterval) How often to emitListenerContainerIdleEvents in milliseconds.voidsetjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler) Provide a JavaLangErrorHandler implementation; by default,System.exit(99)is called.voidsetListenerId(String listenerId) Set the listener id.voidsetLookupKeyQualifier(String lookupKeyQualifier) Set a qualifier that will prefix the connection factory lookup key; default none.voidsetMessageListener(MessageListener messageListener) Set theMessageListener.voidsetMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter) Set theMessagePropertiesConverterfor this listener container.voidsetMicrometerEnabled(boolean micrometerEnabled) Set to false to disable micrometer listener timers.voidsetMicrometerTags(Map<String, String> tags) Set additional tags for the Micrometer listener timers.voidsetMismatchedQueuesFatal(boolean mismatchedQueuesFatal) Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc).voidsetMissingQueuesFatal(boolean missingQueuesFatal) If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal.voidsetNoLocal(boolean noLocal) Set to true for an no-local consumer.protected voidvoidsetPhase(int phase) Specify the phase in which this container should be started and stopped.voidsetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) voidsetPrefetchCount(int prefetchCount) Tell the broker how many messages to send to each consumer in a single request.voidsetQueueNames(String... queueName) Set the name of the queue(s) to receive messages from.final voidSet the name of the queue(s) to receive messages from.voidsetRecoveryBackOff(BackOff recoveryBackOff) Specify theBackOfffor interval between recovery attempts.voidsetRecoveryInterval(long recoveryInterval) Specify the interval between recovery attempts, in milliseconds.voidsetShutdownTimeout(long shutdownTimeout) The time to wait for workers in milliseconds after the container is stopped.voidsetStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId) Set whether a message with a null messageId is fatal for the consumer when using stateful retry.voidsetTaskExecutor(Executor taskExecutor) Set a task executor for the container - used to create the consumers not at runtime.voidsetTransactionAttribute(TransactionAttribute transactionAttribute) Set the transaction attribute to use when using an external transaction manager.voidsetTransactionManager(PlatformTransactionManager transactionManager) Set the transaction manager to use.voidsetupMessageListener(MessageListener messageListener) Setup the message listener to use.voidshutdown()Stop the shared Connection, calldoShutdown(), and close this container.voidstart()Start this container.voidstop()Stop this container.protected voidprotected voidValidate the configuration of this container.protected ListenerExecutionFailedExceptionMethods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactoryMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.amqp.rabbit.listener.MessageListenerContainer
isConsumerBatchEnabledMethods inherited from interface org.springframework.context.SmartLifecycle
stop
-
Field Details
-
DEFAULT_DEBATCHING_ENABLED
public static final boolean DEFAULT_DEBATCHING_ENABLED- See Also:
-
DEFAULT_PREFETCH_COUNT
public static final int DEFAULT_PREFETCH_COUNT- See Also:
-
DEFAULT_RECOVERY_INTERVAL
public static final long DEFAULT_RECOVERY_INTERVALThe default recovery interval: 5000 ms = 5 seconds.- See Also:
-
DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_SHUTDOWN_TIMEOUT- See Also:
-
consumersMonitor
-
-
Constructor Details
-
AbstractMessageListenerContainer
public AbstractMessageListenerContainer()
-
-
Method Details
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisherin interfaceApplicationEventPublisherAware
-
getApplicationEventPublisher
-
setAcknowledgeMode
Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).
Set to
AcknowledgeMode.MANUALif the listener will send the acknowledgements itself usingChannel.basicAck(long, boolean). Manual acks are consistent with either a transactional or non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a single message then the transaction is probably unnecessary.Set to
AcknowledgeMode.NONEto tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). IfAcknowledgeMode.NONEthen the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).- Parameters:
acknowledgeMode- the acknowledge mode to set. Defaults toAcknowledgeMode.AUTO- See Also:
-
getAcknowledgeMode
- Returns:
- the acknowledgeMode
-
setQueueNames
Set the name of the queue(s) to receive messages from.- Specified by:
setQueueNamesin interfaceMessageListenerContainer- Parameters:
queueName- the desired queueName(s) (can not benull)
-
setQueues
Set the name of the queue(s) to receive messages from.- Parameters:
queues- the desired queue(s) (can not benull)
-
getQueueNames
- Returns:
- the name of the queues to receive messages from.
-
getQueueNamesAsSet
-
getQueueNamesToQueues
Returns a map of current queue names to the Queue object; allows the determination of a changed broker-named queue.- Returns:
- the map.
- Since:
- 2.1
-
addQueueNames
Add queue(s) to this container's list of queues.- Parameters:
queueNames- The queue(s) to add.
-
addQueues
Add queue(s) to this container's list of queues.- Parameters:
queues- The queue(s) to add.
-
removeQueueNames
Remove queue(s) from this container's list of queues.- Parameters:
queueNames- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNamesList.
-
removeQueues
Remove queue(s) from this container's list of queues.- Parameters:
queues- The queue(s) to remove.- Returns:
- the boolean result of removal on the target
queueNamesList.
-
isExposeListenerChannel
public boolean isExposeListenerChannel()- Returns:
- whether to expose the listener
Channelto a registeredChannelAwareMessageListener.
-
setExposeListenerChannel
public void setExposeListenerChannel(boolean exposeListenerChannel) Set whether to expose the listener Rabbit Channel to a registeredChannelAwareMessageListeneras well as toRabbitTemplatecalls.Default is "true", reusing the listener's
Channel. Turn this off to expose a fresh Rabbit Channel fetched from the same underlying RabbitConnectioninstead.Note that Channels managed by an external transaction manager will always get exposed to
RabbitTemplatecalls. So in terms of RabbitTemplate exposure, this setting only affects locally transacted Channels.- Parameters:
exposeListenerChannel- true to expose the channel.- See Also:
-
setMessageListener
Set theMessageListener.- Parameters:
messageListener- the listener.- Since:
- 2.0
-
checkMessageListener
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.Only a Spring
MessageListenerobject will be accepted.- Parameters:
listener- the message listener object to check- Throws:
IllegalArgumentException- if the supplied listener is not a MessageListener- See Also:
-
getMessageListener
Description copied from interface:MessageListenerContainerGet the message listener.- Specified by:
getMessageListenerin interfaceMessageListenerContainer- Returns:
- The message listener object.
-
setErrorHandler
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default aConditionalRejectingErrorHandlerwith its default list of fatal exceptions will be used.- Parameters:
errorHandler- The error handler.
-
setDeBatchingEnabled
public void setDeBatchingEnabled(boolean deBatchingEnabled) Determine whether or not the container should de-batch batched messages (true) or call the listener with the batch (false). Default: true.- Parameters:
deBatchingEnabled- the deBatchingEnabled to set.- See Also:
-
isDeBatchingEnabled
protected boolean isDeBatchingEnabled() -
setAdviceChain
Public setter for theAdviceto apply to listener executions.If a {code #setTransactionManager(PlatformTransactionManager) transactionManager} is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
- Parameters:
adviceChain- the advice chain to set
-
getAdviceChain
-
setAfterReceivePostProcessors
SetMessagePostProcessors that will be applied after message reception, before invoking theMessageListener. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder,Orderand finally unordered.- Parameters:
afterReceivePostProcessors- the post processor.- Since:
- 1.4.2
- See Also:
-
addAfterReceivePostProcessors
AddMessagePostProcessors that will be applied after message reception, before invoking theMessageListener. Often used to decompress data. Processors are invoked in order, depending onPriorityOrder,Orderand finally unordered.In contrast to
setAfterReceivePostProcessors(MessagePostProcessor...), this method does not override the previously added afterReceivePostProcessors.- Parameters:
postprocessors- the post processor.- Since:
- 2.1.4
-
removeAfterReceivePostProcessor
Remove the providedMessagePostProcessorfrom theafterReceivePostProcessorslist.- Parameters:
afterReceivePostProcessor- the MessagePostProcessor to remove.- Returns:
- the boolean if the provided post processor has been removed.
- Since:
- 2.1.4
- See Also:
-
setAutoStartup
public void setAutoStartup(boolean autoStartup) Set whether to automatically start the container after initialization.Default is "true"; set this to "false" to allow for manual startup through the
start()method.- Specified by:
setAutoStartupin interfaceMessageListenerContainer- Parameters:
autoStartup- true for auto startup.
-
isAutoStartup
public boolean isAutoStartup()- Specified by:
isAutoStartupin interfaceSmartLifecycle
-
setPhase
public void setPhase(int phase) Specify the phase in which this container should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible.- Parameters:
phase- The phase.
-
getPhase
public int getPhase()- Specified by:
getPhasein interfacePhased- Specified by:
getPhasein interfaceSmartLifecycle- Returns:
- The phase in which this container will be started and stopped.
-
setBeanName
- Specified by:
setBeanNamein interfaceBeanNameAware
-
getBeanName
- Returns:
- The bean name that this listener container has been assigned in its containing bean factory, if any.
-
getApplicationContext
-
setApplicationContext
- Specified by:
setApplicationContextin interfaceApplicationContextAware
-
getConnectionFactory
- Overrides:
getConnectionFactoryin classRabbitAccessor- Returns:
- The ConnectionFactory that this accessor uses for obtaining RabbitMQ
Connections.
-
setLookupKeyQualifier
Set a qualifier that will prefix the connection factory lookup key; default none.- Parameters:
lookupKeyQualifier- the qualifier- Since:
- 1.6.9
- See Also:
-
isForceCloseChannel
protected boolean isForceCloseChannel()Force close the channel if the consumer threads don't respond to a shutdown.- Returns:
- true to force close.
- Since:
- 1.7.4
-
setForceCloseChannel
public void setForceCloseChannel(boolean forceCloseChannel) Set to true to force close the channel if the consumer threads don't respond to a shutdown. Default: true (since 2.0).- Parameters:
forceCloseChannel- true to force close.- Since:
- 1.7.4
-
getRoutingLookupKey
Return the lookup key if the connection factory is aRoutingConnectionFactory; null otherwise. The routing key is the comma-delimited list of queue names with all spaces removed and bracketed by [...], optionally prefixed by a qualifier, e.g. "foo[...]".- Returns:
- the key or null.
- Since:
- 1.6.9
- See Also:
-
getRoutingConnectionFactory
Return the (@link RoutingConnectionFactory} if the connection factory is aRoutingConnectionFactory; null otherwise.- Returns:
- the
RoutingConnectionFactoryor null. - Since:
- 1.6.9
-
getListenerId
The 'id' attribute of the listener.- Returns:
- the id (or the container bean name if no id set).
-
setListenerId
Description copied from interface:MessageListenerContainerSet the listener id.- Specified by:
setListenerIdin interfaceMessageListenerContainer- Parameters:
listenerId- the id.
-
setConsumerTagStrategy
Set the implementation ofConsumerTagStrategyto generate consumer tags. By default, the RabbitMQ server generates consumer tags.- Parameters:
consumerTagStrategy- the consumerTagStrategy to set.- Since:
- 1.4.5
-
getConsumerTagStrategy
Return the consumer tag strategy to use.- Returns:
- the strategy.
- Since:
- 2.0
-
setConsumerArguments
Set consumer arguments.- Parameters:
args- the arguments.- Since:
- 1.3
-
getConsumerArguments
Return the consumer arguments.- Returns:
- the arguments.
- Since:
- 2.0
-
setExclusive
public void setExclusive(boolean exclusive) Set to true for an exclusive consumer.- Parameters:
exclusive- true for an exclusive consumer.
-
isExclusive
protected boolean isExclusive()Return whether the consumers should be exclusive.- Returns:
- true for exclusive consumers.
-
setNoLocal
public void setNoLocal(boolean noLocal) Set to true for an no-local consumer.- Parameters:
noLocal- true for an no-local consumer.
-
isNoLocal
protected boolean isNoLocal()Return whether the consumers should be no-local.- Returns:
- true for no-local consumers.
-
setDefaultRequeueRejected
public void setDefaultRequeueRejected(boolean defaultRequeueRejected) Set the default behavior when a message is rejected, for example because the listener threw an exception. When true, messages will be requeued, when false, they will not. For versions of Rabbit that support dead-lettering, the message must not be requeued in order to be sent to the dead letter exchange. Setting to false causes all rejections to not be requeued. When true, the default can be overridden by the listener throwing anAmqpRejectAndDontRequeueException. Default true.- Parameters:
defaultRequeueRejected- true to reject by default.
-
isDefaultRequeueRejected
protected boolean isDefaultRequeueRejected()Return the default requeue rejected.- Returns:
- the boolean.
- Since:
- 2.0
- See Also:
-
setPrefetchCount
public void setPrefetchCount(int prefetchCount) Tell the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput.- Parameters:
prefetchCount- the prefetch count- See Also:
-
Channel.basicQos(int, boolean)
-
getPrefetchCount
protected int getPrefetchCount()Return the prefetch count.- Returns:
- the count.
- Since:
- 2.0
-
setGlobalQos
public void setGlobalQos(boolean globalQos) Apply prefetchCount to the entire channel.- Parameters:
globalQos- true for a channel-wide prefetch.- Since:
- 2.2.17
- See Also:
-
Channel.basicQos(int, boolean)
-
isGlobalQos
protected boolean isGlobalQos() -
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout) The time to wait for workers in milliseconds after the container is stopped. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Defaults to 5 seconds.- Parameters:
shutdownTimeout- the shutdown timeout to set
-
getShutdownTimeout
protected long getShutdownTimeout() -
setIdleEventInterval
public void setIdleEventInterval(long idleEventInterval) How often to emitListenerContainerIdleEvents in milliseconds.- Parameters:
idleEventInterval- the interval.
-
getIdleEventInterval
protected long getIdleEventInterval() -
getLastReceive
protected long getLastReceive()Get the time the last message was received - initialized to container start time.- Returns:
- the time.
-
setTransactionManager
Set the transaction manager to use.- Parameters:
transactionManager- the transaction manager.
-
getTransactionManager
-
setTransactionAttribute
Set the transaction attribute to use when using an external transaction manager.- Parameters:
transactionAttribute- the transaction attribute to set
-
getTransactionAttribute
-
setTaskExecutor
Set a task executor for the container - used to create the consumers not at runtime.- Parameters:
taskExecutor- the task executor.
-
getTaskExecutor
-
setRecoveryInterval
public void setRecoveryInterval(long recoveryInterval) Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.- Parameters:
recoveryInterval- The recovery interval.
-
setRecoveryBackOff
Specify theBackOfffor interval between recovery attempts. The default is 5000 ms, that is, 5 seconds. With theBackOffyou can supply themaxAttemptsfor recovery before thestop()will be performed.- Parameters:
recoveryBackOff- The BackOff to recover.- Since:
- 1.5
-
getRecoveryBackOff
-
setMessagePropertiesConverter
Set theMessagePropertiesConverterfor this listener container.- Parameters:
messagePropertiesConverter- The properties converter.
-
getMessagePropertiesConverter
-
getAmqpAdmin
-
setAmqpAdmin
Set theAmqpAdmin, used to declare any auto-delete queues, bindings etc when the container is started. Only needed if those queues use conditional declaration (have a 'declared-by' attribute). If not specified, an internal admin will be used which will attempt to declare all elements not having a 'declared-by' attribute.- Parameters:
amqpAdmin- the AmqpAdmin to use- Since:
- 2.1
-
setMissingQueuesFatal
public void setMissingQueuesFatal(boolean missingQueuesFatal) If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
- Parameters:
missingQueuesFatal- the missingQueuesFatal to set.- Since:
- 1.3.5
- See Also:
-
isMissingQueuesFatal
protected boolean isMissingQueuesFatal() -
isMissingQueuesFatalSet
protected boolean isMissingQueuesFatalSet() -
setMismatchedQueuesFatal
public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal) Prevent the container from starting if any of the queues defined in the context have mismatched arguments (TTL etc). Default false.- Parameters:
mismatchedQueuesFatal- true to fail initialization when this condition occurs.- Since:
- 1.6
-
isMismatchedQueuesFatal
protected boolean isMismatchedQueuesFatal() -
setPossibleAuthenticationFailureFatal
public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) -
doSetPossibleAuthenticationFailureFatal
protected final void doSetPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) -
isPossibleAuthenticationFailureFatal
public boolean isPossibleAuthenticationFailureFatal() -
isPossibleAuthenticationFailureFatalSet
protected boolean isPossibleAuthenticationFailureFatalSet() -
isAsyncReplies
protected boolean isAsyncReplies() -
setAutoDeclare
public void setAutoDeclare(boolean autoDeclare) Set to true to automatically declare elements (queues, exchanges, bindings) in the application context during container start().- Parameters:
autoDeclare- the boolean flag to indicate an declaration operation.- Since:
- 1.4
- See Also:
-
isAutoDeclare
protected boolean isAutoDeclare() -
setFailedDeclarationRetryInterval
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.- Parameters:
failedDeclarationRetryInterval- the interval, default 5000.- Since:
- 1.3.9
-
getFailedDeclarationRetryInterval
protected long getFailedDeclarationRetryInterval() -
isStatefulRetryFatalWithNullMessageId
protected boolean isStatefulRetryFatalWithNullMessageId() -
setStatefulRetryFatalWithNullMessageId
public void setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWithNullMessageId) Set whether a message with a null messageId is fatal for the consumer when using stateful retry. When false, instead of stopping the consumer, the message is rejected and not requeued - it will be discarded or routed to the dead letter queue, if so configured. Default true.- Parameters:
statefulRetryFatalWithNullMessageId- true for fatal.- Since:
- 2.0
-
setExclusiveConsumerExceptionLogger
public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger) Set aConditionalExceptionLoggerfor logging exclusive consumer failures. The default is to log such failures at WARN level.- Parameters:
exclusiveConsumerExceptionLogger- the conditional exception logger.- Since:
- 1.5
-
getExclusiveConsumerExceptionLogger
-
setAlwaysRequeueWithTxManagerRollback
public void setAlwaysRequeueWithTxManagerRollback(boolean alwaysRequeueWithTxManagerRollback) Set to true to always requeue on transaction rollback with an externalTransactionManager. With earlier releases, when a transaction manager was configured, a transaction rollback always requeued the message. This was inconsistent with local transactions where the normaldefaultRequeueRejectedandAmqpRejectAndDontRequeueExceptionlogic was honored to determine whether the message was requeued. RabbitMQ does not consider the message delivery to be part of the transaction. This boolean was introduced in 1.7.1, set to true by default, to be consistent with previous behavior. Starting with version 2.0, it is false by default.- Parameters:
alwaysRequeueWithTxManagerRollback- true to always requeue on rollback.- Since:
- 1.7.1.
-
isAlwaysRequeueWithTxManagerRollback
protected boolean isAlwaysRequeueWithTxManagerRollback() -
setErrorHandlerLoggerName
Set the name (category) of the logger used to log exceptions thrown by the error handler. It defaults to the container's logger but can be overridden if you want it to log at a different level to the container. Such exceptions are logged at the ERROR level.- Parameters:
errorHandlerLoggerName- the logger name.- Since:
- 2.0.8
-
setBatchingStrategy
Set a batching strategy to use when de-batching messages. Default isSimpleBatchingStrategy.- Parameters:
batchingStrategy- the strategy.- Since:
- 2.2
- See Also:
-
getBatchingStrategy
-
getAfterReceivePostProcessors
-
setMicrometerTags
Set additional tags for the Micrometer listener timers.- Parameters:
tags- the tags.- Since:
- 2.2
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled) Set to false to disable micrometer listener timers.- Parameters:
micrometerEnabled- false to disable.- Since:
- 2.2
-
getConsumeDelay
protected long getConsumeDelay()Get the consumeDelay - a time to wait before consuming in ms.- Returns:
- the consume delay.
- Since:
- 2.3
-
setConsumeDelay
public void setConsumeDelay(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin withconcurrency > 1, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.- Parameters:
consumeDelay- the consume delay.- Since:
- 2.3
-
getJavaLangErrorHandler
-
setjavaLangErrorHandler
public void setjavaLangErrorHandler(AbstractMessageListenerContainer.JavaLangErrorHandler javaLangErrorHandler) Provide a JavaLangErrorHandler implementation; by default,System.exit(99)is called.- Parameters:
javaLangErrorHandler- the handler.- Since:
- 2.2.12
-
afterPropertiesSet
public void afterPropertiesSet()Delegates tovalidateConfiguration()andinitialize().- Specified by:
afterPropertiesSetin interfaceInitializingBean- Specified by:
afterPropertiesSetin interfaceMessageListenerContainer- Overrides:
afterPropertiesSetin classRabbitAccessor
-
setupMessageListener
Description copied from interface:MessageListenerContainerSetup the message listener to use. Throws anIllegalArgumentExceptionif that message listener type is not supported.- Specified by:
setupMessageListenerin interfaceMessageListenerContainer- Parameters:
messageListener- theobjectto wrapped to theMessageListener.
-
validateConfiguration
protected void validateConfiguration()Validate the configuration of this container.The default implementation is empty. To be overridden in subclasses.
-
initializeProxy
-
destroy
public void destroy()Callsshutdown()when the BeanFactory destroys the container instance.- Specified by:
destroyin interfaceDisposableBean- See Also:
-
initialize
public void initialize()Initialize this container.Creates a Rabbit Connection and calls
doInitialize(). -
shutdown
public void shutdown()Stop the shared Connection, calldoShutdown(), and close this container. -
setNotRunning
protected void setNotRunning() -
doInitialize
protected abstract void doInitialize()Register any invokers within this container.Subclasses need to implement this method for their specific invoker management process.
-
doShutdown
protected abstract void doShutdown()Close the registered invokers.Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
- See Also:
-
isActive
public final boolean isActive()- Returns:
- Whether this container is currently active, that is, whether it has been set up but not shut down yet.
-
start
public void start()Start this container. -
doStart
protected void doStart()Start this container, and notify all invoker tasks. -
stop
public void stop()Stop this container. -
doStop
protected void doStop()This method is invoked when the container is stopping. -
isRunning
public final boolean isRunning()Determine whether this container is currently running, that is, whether it has been started and not stopped yet. -
invokeErrorHandler
Invoke the registered ErrorHandler, if any. Log at error level otherwise. The default error handler is aConditionalRejectingErrorHandlerwith the defaultFatalExceptionStrategyimplementation.- Parameters:
ex- the uncaught error that arose during Rabbit processing.- See Also:
-
executeListener
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).- Parameters:
channel- the Rabbit Channel to operate ondata- the received Rabbit Message- See Also:
-
invokeListener
-
actualInvokeListener
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.- Parameters:
channel- the Rabbit Channel to operate ondata- the received Rabbit Message or List of Message.- See Also:
-
doInvokeListener
protected void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Object data) Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded. An exception thrown from the listener will be wrapped in aListenerExecutionFailedException.- Parameters:
listener- the Spring ChannelAwareMessageListener to invokechannel- the Rabbit Channel to operate ondata- the received Rabbit Message or List of Message.- See Also:
-
doInvokeListener
Invoke the specified listener as Spring Rabbit MessageListener.Default implementation performs a plain invocation of the
onMessagemethod.Exception thrown from listener will be wrapped to
ListenerExecutionFailedException.- Parameters:
listener- the Rabbit MessageListener to invokedata- the received Rabbit Message or List of Message.- See Also:
-
isChannelLocallyTransacted
protected boolean isChannelLocallyTransacted()Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.
- Returns:
- whether the given Channel is locally transacted
- See Also:
-
handleListenerException
Handle the given exception that arose during listener execution.The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.
- Parameters:
ex- the exception to handle
-
wrapToListenerExecutionFailedExceptionIfNeeded
protected ListenerExecutionFailedException wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Object data) - Parameters:
e- The Exception.data- The failed message.- Returns:
- If 'e' is of type
ListenerExecutionFailedException- return 'e' as it is, otherwise wrap it toListenerExecutionFailedExceptionand return.
-
publishConsumerFailedEvent
-
publishMissingQueueEvent
-
publishIdleContainerEvent
protected final void publishIdleContainerEvent(long idleTime) -
updateLastReceive
protected void updateLastReceive() -
configureAdminIfNeeded
protected void configureAdminIfNeeded() -
checkMismatchedQueues
protected void checkMismatchedQueues() -
lazyLoad
public void lazyLoad()Description copied from interface:MessageListenerContainerDo not check for missing or mismatched queues during startup. Used for lazily loaded message listener containers to avoid a deadlock when starting such containers. Applications lazily loading containers should verify the queue configuration before loading the container bean.- Specified by:
lazyLoadin interfaceMessageListenerContainer
-
redeclareElementsIfNecessary
protected void redeclareElementsIfNecessary()UseAmqpAdmin.initialize()to redeclare everything if necessary. Since auto deletion of a queue can cause upstream elements (bindings, exchanges) to be deleted too, everything needs to be redeclared if a queue is missing. Declaration is idempotent so, aside from some network chatter, there is no issue, and we only will do it if we detect our queue is gone.In general it makes sense only for the 'auto-delete' or 'expired' queues, but with the server TTL policy we don't have ability to determine 'expiration' option for the queue.
Starting with version 1.6, if
mismatchedQueuesFatalis true, the declarations are always attempted during restart so the listener will fail with a fatal error if mismatches occur. -
causeChainHasImmediateAcknowledgeAmqpException
Traverse the cause chain and, if anImmediateAcknowledgeAmqpExceptionis found before anAmqpRejectAndDontRequeueException, return true. AnErrorwill take precedence.- Parameters:
ex- the exception- Returns:
- true if we should ack immediately.
- Since:
- 1.6.6
-
prepareHolderForRollback
protected void prepareHolderForRollback(RabbitResourceHolder resourceHolder, RuntimeException exception) A null resource holder is rare, but possible if the transaction attribute caused no transaction to be started (e.g.TransactionDefinition.PROPAGATION_NONE). In that case the delivery tags will have been processed manually.- Parameters:
resourceHolder- the bound resource holder (if a transaction is active).exception- the exception.
-
debatch
-