Class SimpleMessageListenerContainer
- All Implemented Interfaces:
MessageListenerContainer,Aware,BeanNameAware,DisposableBean,InitializingBean,ApplicationContextAware,ApplicationEventPublisherAware,Lifecycle,Phased,SmartLifecycle
- Since:
- 1.0
- Author:
- Mark Pollack, Mark Fisher, Dave Syer, Gary Russell, Artem Bilan, Alex Panchenko, Mat Jaggard
-
Nested Class Summary
Nested classes/interfaces inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
AbstractMessageListenerContainer.JavaLangErrorHandler, AbstractMessageListenerContainer.SharedConnectionNotInitializedException, AbstractMessageListenerContainer.WrappedTransactionException -
Field Summary
FieldsFields inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
consumersMonitor, DEFAULT_DEBATCHING_ENABLED, DEFAULT_PREFETCH_COUNT, DEFAULT_RECOVERY_INTERVAL, DEFAULT_SHUTDOWN_TIMEOUTFields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
loggerFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Constructor Summary
ConstructorsConstructorDescriptionDefault constructor for convenient dependency injection via setters.SimpleMessageListenerContainer(ConnectionFactory connectionFactory) Create a listener container from the connection factory (mandatory). -
Method Summary
Modifier and TypeMethodDescriptionprotected voidaddAndStartConsumers(int delta) Start up to delta consumers, limited bysetMaxConcurrentConsumers(int).voidaddQueueNames(String... queueName) Add queue(s) to this container's list of queues.voidAdd queue(s) to this container's list of queues.protected voidadjustConsumers(int deltaArg) Adjust consumers depending on delta.protected BlockingQueueConsumerprotected voidRegister any invokers within this container.protected voidClose the registered invokers.protected voiddoStart()Re-initializes this container's Rabbit message consumers, if not initialized already.intprotected voidhandleStartupFailure(BackOffExecution backOffExecution) protected intbooleanReturn true if this container is capable of (and configured to) create batches of consumed messages.protected voidpublishConsumerFailedEvent(String reason, boolean fatal, Throwable t) booleanremoveQueueNames(String... queueName) Remove queues from this container's list of queues.booleanremoveQueues(Queue... queue) Remove queues from this container's list of queues.voidsetBatchSize(int batchSize) This property has several functions.voidsetConcurrency(String concurrency) Specify concurrency limits via a "lower-upper" String, e.g.voidsetConcurrentConsumers(int concurrentConsumers) Specify the number of concurrent consumers to create.final voidsetConsecutiveActiveTrigger(int consecutiveActiveTrigger) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, andmaxConcurrentConsumershas not been reached, specifies the number of consecutive cycles when a single consumer was active, in order to consider starting a new consumer.final voidsetConsecutiveIdleTrigger(int consecutiveIdleTrigger) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, and the number of consumers exceedsconcurrentConsumers, specifies the number of consecutive receive attempts that return no data; after which we consider stopping a consumer.voidsetConsumerBatchEnabled(boolean consumerBatchEnabled) Set to true to present a list of messages based on thesetBatchSize(int), if the listener supports it.voidsetConsumerStartTimeout(long consumerStartTimeout) When starting a consumer, if this time (ms) elapses before the consumer starts, an error log is written; one possible cause would be if thetaskExecutorhas insufficient threads to support the container concurrency.voidsetDeclarationRetries(int declarationRetries) Set the number of retries after passive queue declaration fails.final voidsetExclusive(boolean exclusive) Set to true for an exclusive consumer - if true, the concurrency must be 1.voidsetMaxConcurrentConsumers(int maxConcurrentConsumers) Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'.voidsetMissingQueuesFatal(boolean missingQueuesFatal) If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal.voidsetQueueNames(String... queueName) Set the name of the queue(s) to receive messages from.voidsetReceiveTimeout(long receiveTimeout) The time (in milliseconds) that a consumer should wait for data.voidsetRetryDeclarationInterval(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).final voidsetStartConsumerMinInterval(long startConsumerMinInterval) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, andmaxConcurrentConsumershas not been reached, specifies the minimum time (milliseconds) between starting new consumers on demand.final voidsetStopConsumerMinInterval(long stopConsumerMinInterval) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, and the number of consumers exceedsconcurrentConsumers, specifies the minimum time (milliseconds) between stopping idle consumers.protected final booleanAlways use a shared Rabbit Connection.voidtoString()protected voidAvoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.Methods inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
actualInvokeListener, addAfterReceivePostProcessors, afterPropertiesSet, causeChainHasImmediateAcknowledgeAmqpException, checkMessageListener, checkMismatchedQueues, configureAdminIfNeeded, debatch, destroy, doInvokeListener, doInvokeListener, doSetPossibleAuthenticationFailureFatal, doStop, executeListener, getAcknowledgeMode, getAdviceChain, getAfterReceivePostProcessors, getAmqpAdmin, getApplicationContext, getApplicationEventPublisher, getBatchingStrategy, getBeanName, getConnectionFactory, getConsumeDelay, getConsumerArguments, getConsumerTagStrategy, getExclusiveConsumerExceptionLogger, getFailedDeclarationRetryInterval, getIdleEventInterval, getJavaLangErrorHandler, getLastReceive, getListenerId, getMessageListener, getMessagePropertiesConverter, getPhase, getPrefetchCount, getQueueNames, getQueueNamesAsSet, getQueueNamesToQueues, getRecoveryBackOff, getRoutingConnectionFactory, getRoutingLookupKey, getShutdownTimeout, getTaskExecutor, getTransactionAttribute, getTransactionManager, handleListenerException, initialize, initializeProxy, invokeErrorHandler, invokeListener, isActive, isAlwaysRequeueWithTxManagerRollback, isAsyncReplies, isAutoDeclare, isAutoStartup, isChannelLocallyTransacted, isDeBatchingEnabled, isDefaultRequeueRejected, isExclusive, isExposeListenerChannel, isForceCloseChannel, isGlobalQos, isMismatchedQueuesFatal, isMissingQueuesFatal, isMissingQueuesFatalSet, isNoLocal, isPossibleAuthenticationFailureFatal, isPossibleAuthenticationFailureFatalSet, isRunning, isStatefulRetryFatalWithNullMessageId, lazyLoad, prepareHolderForRollback, publishIdleContainerEvent, publishMissingQueueEvent, redeclareElementsIfNecessary, removeAfterReceivePostProcessor, setAcknowledgeMode, setAdviceChain, setAfterReceivePostProcessors, setAlwaysRequeueWithTxManagerRollback, setAmqpAdmin, setApplicationContext, setApplicationEventPublisher, setAutoDeclare, setAutoStartup, setBatchingStrategy, setBeanName, setConsumeDelay, setConsumerArguments, setConsumerTagStrategy, setDeBatchingEnabled, setDefaultRequeueRejected, setErrorHandler, setErrorHandlerLoggerName, setExclusiveConsumerExceptionLogger, setExposeListenerChannel, setFailedDeclarationRetryInterval, setForceCloseChannel, setGlobalQos, setIdleEventInterval, setjavaLangErrorHandler, setListenerId, setLookupKeyQualifier, setMessageListener, setMessagePropertiesConverter, setMicrometerEnabled, setMicrometerTags, setMismatchedQueuesFatal, setNoLocal, setNotRunning, setPhase, setPossibleAuthenticationFailureFatal, setPrefetchCount, setQueues, setRecoveryBackOff, setRecoveryInterval, setShutdownTimeout, setStatefulRetryFatalWithNullMessageId, setTaskExecutor, setTransactionAttribute, setTransactionManager, setupMessageListener, shutdown, start, stop, updateLastReceive, wrapToListenerExecutionFailedExceptionIfNeededMethods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
-
Field Details
-
DEFAULT_RECEIVE_TIMEOUT
public static final long DEFAULT_RECEIVE_TIMEOUT- See Also:
-
-
Constructor Details
-
SimpleMessageListenerContainer
public SimpleMessageListenerContainer()Default constructor for convenient dependency injection via setters. -
SimpleMessageListenerContainer
Create a listener container from the connection factory (mandatory).- Parameters:
connectionFactory- theConnectionFactory
-
-
Method Details
-
setConcurrentConsumers
public void setConcurrentConsumers(int concurrentConsumers) Specify the number of concurrent consumers to create. Default is 1.Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues. Cannot be more than
maxConcurrentConsumers(if set).- Parameters:
concurrentConsumers- the minimum number of consumers to create.- See Also:
-
setMaxConcurrentConsumers
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'. Consumers will be added on demand. Cannot be less thanconcurrentConsumers.- Parameters:
maxConcurrentConsumers- the maximum number of consumers.- See Also:
-
setConcurrency
Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple upper limit String, e.g. "10" (a fixed number of consumers).This listener container will always hold on to the minimum number of consumers (
setConcurrentConsumers(int)) and will slowly scale up to the maximum number of consumerssetMaxConcurrentConsumers(int)in case of increasing load.- Parameters:
concurrency- the concurrency.- Since:
- 2.0
-
setExclusive
public final void setExclusive(boolean exclusive) Set to true for an exclusive consumer - if true, the concurrency must be 1.- Overrides:
setExclusivein classAbstractMessageListenerContainer- Parameters:
exclusive- true for an exclusive consumer.
-
setStartConsumerMinInterval
public final void setStartConsumerMinInterval(long startConsumerMinInterval) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, andmaxConcurrentConsumershas not been reached, specifies the minimum time (milliseconds) between starting new consumers on demand. Default is 10000 (10 seconds).- Parameters:
startConsumerMinInterval- The minimum interval between new consumer starts.- See Also:
-
setStopConsumerMinInterval
public final void setStopConsumerMinInterval(long stopConsumerMinInterval) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, and the number of consumers exceedsconcurrentConsumers, specifies the minimum time (milliseconds) between stopping idle consumers. Default is 60000 (1 minute).- Parameters:
stopConsumerMinInterval- The minimum interval between consumer stops.- See Also:
-
setConsecutiveActiveTrigger
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, andmaxConcurrentConsumershas not been reached, specifies the number of consecutive cycles when a single consumer was active, in order to consider starting a new consumer. If the consumer goes idle for one cycle, the counter is reset. This is impacted by thebatchSize. Default is 10 consecutive messages.- Parameters:
consecutiveActiveTrigger- The number of consecutive receives to trigger a new consumer.- See Also:
-
setConsecutiveIdleTrigger
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger) IfmaxConcurrentConsumersis greater thenconcurrentConsumers, and the number of consumers exceedsconcurrentConsumers, specifies the number of consecutive receive attempts that return no data; after which we consider stopping a consumer. The idle time is effectivelyreceiveTimeout*batchSize* this value because the consumer thread waits for a message for up toreceiveTimeoutup tobatchSizetimes. Default is 10 consecutive idles.- Parameters:
consecutiveIdleTrigger- The number of consecutive timeouts to trigger stopping a consumer.- See Also:
-
setReceiveTimeout
public void setReceiveTimeout(long receiveTimeout) The time (in milliseconds) that a consumer should wait for data. Default 1000 (1 second).- Parameters:
receiveTimeout- the timeout.- See Also:
-
setBatchSize
public void setBatchSize(int batchSize) This property has several functions.When the channel is transacted, it determines how many messages to process in a single transaction. It should be less than or equal to
the prefetch count.It also affects how often acks are sent when using
AcknowledgeMode.AUTO- one ack per BatchSize.Finally, when
setConsumerBatchEnabled(boolean)is true, it determines how many records to include in the batch as long as sufficient messages arrive withinsetReceiveTimeout(long).IMPORTANT The batch size represents the number of physical messages received. If
AbstractMessageListenerContainer.setDeBatchingEnabled(boolean)is true and a message is a batch created by a producer, the actual number of messages received by the listener will be larger than this batch size.Default is 1.
- Parameters:
batchSize- the batch size- Since:
- 2.2
- See Also:
-
setConsumerBatchEnabled
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) Set to true to present a list of messages based on thesetBatchSize(int), if the listener supports it. This will coercedeBatchingEnabledto true as well.- Parameters:
consumerBatchEnabled- true to create message batches in the container.- Since:
- 2.2
- See Also:
-
isConsumerBatchEnabled
public boolean isConsumerBatchEnabled()Description copied from interface:MessageListenerContainerReturn true if this container is capable of (and configured to) create batches of consumed messages.- Returns:
- true if enabled.
-
setMissingQueuesFatal
public void setMissingQueuesFatal(boolean missingQueuesFatal) If all of the configured queue(s) are not available on the broker, this setting determines whether the condition is fatal. When true, and the queues are missing during startup, the context refresh() will fail.When false, the condition is not considered fatal and the container will continue to attempt to start the consumers.
When true, if the queues are removed while the container is running, the container is stopped.
Defaults to true for this container.
- Overrides:
setMissingQueuesFatalin classAbstractMessageListenerContainer- Parameters:
missingQueuesFatal- the missingQueuesFatal to set.- See Also:
-
setQueueNames
Description copied from class:AbstractMessageListenerContainerSet the name of the queue(s) to receive messages from.- Specified by:
setQueueNamesin interfaceMessageListenerContainer- Overrides:
setQueueNamesin classAbstractMessageListenerContainer- Parameters:
queueName- the desired queueName(s) (can not benull)
-
addQueueNames
Add queue(s) to this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. The queue must exist to avoid problems when restarting the consumers.- Overrides:
addQueueNamesin classAbstractMessageListenerContainer- Parameters:
queueName- The queue to add.
-
removeQueueNames
Remove queues from this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. At least one queue must remain.- Overrides:
removeQueueNamesin classAbstractMessageListenerContainer- Parameters:
queueName- The queue to remove.- Returns:
- the boolean result of removal on the target
queueNamesList.
-
addQueues
Add queue(s) to this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. The queue must exist to avoid problems when restarting the consumers.- Overrides:
addQueuesin classAbstractMessageListenerContainer- Parameters:
queue- The queue to add.
-
removeQueues
Remove queues from this container's list of queues. The existing consumers will be cancelled after they have processed any pre-fetched messages and new consumers will be created. At least one queue must remain.- Overrides:
removeQueuesin classAbstractMessageListenerContainer- Parameters:
queue- The queue to remove.- Returns:
- the boolean result of removal on the target
queueNamesList.
-
setDeclarationRetries
public void setDeclarationRetries(int declarationRetries) Set the number of retries after passive queue declaration fails.- Parameters:
declarationRetries- The number of retries, default 3.- Since:
- 1.3.9
- See Also:
-
setRetryDeclarationInterval
public void setRetryDeclarationInterval(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).- Parameters:
retryDeclarationInterval- the interval, default 60000.- Since:
- 1.3.9
-
setConsumerStartTimeout
public void setConsumerStartTimeout(long consumerStartTimeout) When starting a consumer, if this time (ms) elapses before the consumer starts, an error log is written; one possible cause would be if thetaskExecutorhas insufficient threads to support the container concurrency. Default 60000.- Parameters:
consumerStartTimeout- the timeout.- Since:
- 1.7.5
-
validateConfiguration
protected void validateConfiguration()Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.- Overrides:
validateConfigurationin classAbstractMessageListenerContainer
-
doInitialize
protected void doInitialize()Description copied from class:AbstractMessageListenerContainerRegister any invokers within this container.Subclasses need to implement this method for their specific invoker management process.
- Specified by:
doInitializein classAbstractMessageListenerContainer
-
getActiveConsumerCount
-
doStart
protected void doStart()Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer to this container's task executor.- Overrides:
doStartin classAbstractMessageListenerContainer
-
doShutdown
protected void doShutdown()Description copied from class:AbstractMessageListenerContainerClose the registered invokers.Subclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
- Specified by:
doShutdownin classAbstractMessageListenerContainer- See Also:
-
stop
-
initializeConsumers
protected int initializeConsumers() -
adjustConsumers
protected void adjustConsumers(int deltaArg) Adjust consumers depending on delta.- Parameters:
deltaArg- a negative value increases, positive decreases.- Since:
- 1.7.8
-
addAndStartConsumers
protected void addAndStartConsumers(int delta) Start up to delta consumers, limited bysetMaxConcurrentConsumers(int).- Parameters:
delta- the consumers to add.
-
createBlockingQueueConsumer
-
handleStartupFailure
-
publishConsumerFailedEvent
- Overrides:
publishConsumerFailedEventin classAbstractMessageListenerContainer
-
toString
-