public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
| Modifier and Type | Class and Description |
|---|---|
static interface |
SimpleMessageListenerContainer.ContainerDelegate |
AbstractMessageListenerContainer.SharedConnectionNotInitializedException| Modifier and Type | Field and Description |
|---|---|
static int |
DEFAULT_PREFETCH_COUNT |
static long |
DEFAULT_RECEIVE_TIMEOUT |
static long |
DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.
|
static long |
DEFAULT_SHUTDOWN_TIMEOUT |
DEFAULT_DEBATCHING_ENABLEDlogger| Constructor and Description |
|---|
SimpleMessageListenerContainer()
Default constructor for convenient dependency injection via setters.
|
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Create a listener container from the connection factory (mandatory).
|
| Modifier and Type | Method and Description |
|---|---|
protected void |
addAndStartConsumers(int delta) |
void |
addQueueNames(String... queueName)
Add queue(s) to this container's list of queues.
|
void |
addQueues(Queue... queue)
Add queue(s) to this container's list of queues.
|
protected BlockingQueueConsumer |
createBlockingQueueConsumer() |
protected void |
doInitialize()
Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated
MessageConsumer.
|
protected void |
doShutdown()
Close the registered invokers.
|
protected void |
doStart()
Re-initializes this container's Rabbit message consumers, if not initialized already.
|
protected void |
doStop()
This method is invoked when the container is stopping.
|
int |
getActiveConsumerCount() |
protected RabbitAdmin |
getRabbitAdmin() |
protected void |
handleStartupFailure(BackOffExecution backOffExecution)
Wait for a period determined by the
recoveryInterval
or setRecoveryBackOff(BackOff) to give the container a
chance to recover from consumer startup failure, e.g. |
protected int |
initializeConsumers() |
protected void |
invokeListener(com.rabbitmq.client.Channel channel,
Message message)
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
|
protected boolean |
isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
listener container's Channel handling and not by an external transaction coordinator.
|
boolean |
removeQueueNames(String... queueName)
Remove queues from this container's list of queues.
|
boolean |
removeQueues(Queue... queue)
Remove queue(s) from this container's list of queues.
|
void |
setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)
Public setter for the
Advice to apply to listener executions. |
void |
setAutoDeclare(boolean autoDeclare) |
void |
setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create.
|
void |
setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers, and
maxConcurrentConsumers has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. |
void |
setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
If
maxConcurrentConsumers is greater then concurrentConsumers, and
the number of consumers exceeds concurrentConsumers, specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. |
void |
setConsumerArguments(Map<String,Object> args) |
void |
setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
Set the implementation of
ConsumerTagStrategy to generate consumer tags. |
void |
setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.
|
void |
setDefaultRequeueRejected(boolean defaultRequeueRejected)
Determines the default behavior when a message is rejected, for example because the listener
threw an exception.
|
void |
setExclusive(boolean exclusive)
Set to true for an exclusive consumer - if true, the concurrency must be 1.
|
void |
setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.
|
void |
setMaxConcurrentConsumers(int maxConcurrentConsumers)
Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'.
|
void |
setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set the
MessagePropertiesConverter for this listener container. |
void |
setMissingQueuesFatal(boolean missingQueuesFatal)
If all of the configured queue(s) are not available on the broker, this setting
determines whether the condition is fatal (default true).
|
void |
setPrefetchCount(int prefetchCount)
Tells the broker how many messages to send to each consumer in a single request.
|
void |
setQueueNames(String... queueName)
Set the name of the queue(s) to receive messages from.
|
void |
setQueues(Queue... queues)
Set the name of the queue(s) to receive messages from.
|
void |
setRabbitAdmin(RabbitAdmin rabbitAdmin)
Set the
RabbitAdmin, used to declare any auto-delete queues, bindings
etc when the container is started. |
void |
setReceiveTimeout(long receiveTimeout)
The time (in milliseconds) that a consumer should wait for data.
|
void |
setRecoveryBackOff(BackOff recoveryBackOff)
Specify the
BackOff for interval between recovery attempts. |
void |
setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds.
|
void |
setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only
a subset of the queues were available (milliseconds).
|
void |
setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced
closed.
|
void |
setStartConsumerMinInterval(long startConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers, and
maxConcurrentConsumers has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. |
void |
setStopConsumerMinInterval(long stopConsumerMinInterval)
If
maxConcurrentConsumers is greater then concurrentConsumers, and
the number of consumers exceeds concurrentConsumers, specifies the
minimum time (milliseconds) between stopping idle consumers. |
void |
setTaskExecutor(Executor taskExecutor) |
void |
setTransactionAttribute(TransactionAttribute transactionAttribute) |
void |
setTransactionManager(PlatformTransactionManager transactionManager) |
void |
setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the channel is transactional).
|
protected boolean |
sharedConnectionEnabled()
Always use a shared Rabbit Connection.
|
protected void |
validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
consumers.
|
afterPropertiesSet, checkMessageListener, destroy, doInvokeListener, doInvokeListener, executeListener, getAcknowledgeMode, getApplicationContext, getBeanName, getConnectionFactory, getMessageConverter, getMessageListener, getPhase, getQueueNames, getQueueNamesAsSet, getRequiredQueueNames, handleListenerException, initialize, invokeErrorHandler, isActive, isAutoStartup, isExposeListenerChannel, isRunning, setAcknowledgeMode, setAfterReceivePostProcessors, setApplicationContext, setAutoStartup, setBeanName, setDeBatchingEnabled, setErrorHandler, setExposeListenerChannel, setMessageConverter, setMessageListener, setPhase, setupMessageListener, shutdown, start, stop, stop, wrapToListenerExecutionFailedExceptionIfNeededconvertRabbitAccessException, createConnection, getChannel, getConnection, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactorypublic static final long DEFAULT_RECEIVE_TIMEOUT
public static final int DEFAULT_PREFETCH_COUNT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_RECOVERY_INTERVAL
public SimpleMessageListenerContainer()
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
connectionFactory - the ConnectionFactorypublic void setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)
Advice to apply to listener executions. If txSize>1 then
multiple listener executions will all be wrapped in the same advice up to that limit.
If a transactionManager is provided as well, then
separate advice is created for the transaction and applied first in the chain. In that case the advice chain
provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
adviceChain - the advice chain to setpublic void setRecoveryInterval(long recoveryInterval)
recoveryInterval - The recovery interval.public void setRecoveryBackOff(BackOff recoveryBackOff)
BackOff for interval between recovery attempts.
The default is 5000 ms, that is, 5 seconds.
With the BackOff you can supply the maxAttempts for recovery before
the AbstractMessageListenerContainer.stop() will be performed.recoveryBackOff - The BackOff to recover.public void setConcurrentConsumers(int concurrentConsumers)
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
general, stick with 1 consumer for low-volume queues. Cannot be more than maxConcurrentConsumers (if set).
concurrentConsumers - the minimum number of consumers to create.setMaxConcurrentConsumers(int)public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
concurrentConsumers.maxConcurrentConsumers - the maximum number of consumers.setConcurrentConsumers(int),
setStartConsumerMinInterval(long),
setStopConsumerMinInterval(long),
setConsecutiveActiveTrigger(int),
setConsecutiveIdleTrigger(int)public final void setExclusive(boolean exclusive)
exclusive - true for an exclusive consumer.public final void setStartConsumerMinInterval(long startConsumerMinInterval)
maxConcurrentConsumers is greater then concurrentConsumers, and
maxConcurrentConsumers has not been reached, specifies
the minimum time (milliseconds) between starting new consumers on demand. Default is 10000
(10 seconds).startConsumerMinInterval - The minimum interval between new consumer starts.setMaxConcurrentConsumers(int),
setStartConsumerMinInterval(long)public final void setStopConsumerMinInterval(long stopConsumerMinInterval)
maxConcurrentConsumers is greater then concurrentConsumers, and
the number of consumers exceeds concurrentConsumers, specifies the
minimum time (milliseconds) between stopping idle consumers. Default is 60000
(1 minute).stopConsumerMinInterval - The minimum interval between consumer stops.setMaxConcurrentConsumers(int),
setStopConsumerMinInterval(long)public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger)
maxConcurrentConsumers is greater then concurrentConsumers, and
maxConcurrentConsumers has not been reached, specifies the number of
consecutive cycles when a single consumer was active, in order to consider
starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
This is impacted by the txSize.
Default is 10 consecutive messages.consecutiveActiveTrigger - The number of consecutive receives to trigger a new consumer.setMaxConcurrentConsumers(int),
setStartConsumerMinInterval(long),
setTxSize(int)public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger)
maxConcurrentConsumers is greater then concurrentConsumers, and
the number of consumers exceeds concurrentConsumers, specifies the
number of consecutive receive attempts that return no data; after which we consider
stopping a consumer. The idle time is effectively
receiveTimeout * txSize * this value because the consumer thread waits for
a message for up to receiveTimeout up to txSize times.
Default is 10 consecutive idles.consecutiveIdleTrigger - The number of consecutive timeouts to trigger stopping a consumer.setMaxConcurrentConsumers(int),
setStopConsumerMinInterval(long),
setReceiveTimeout(long),
setTxSize(int)public void setReceiveTimeout(long receiveTimeout)
receiveTimeout - the timeout.setConsecutiveIdleTrigger(int)public void setShutdownTimeout(long shutdownTimeout)
shutdownTimeout - the shutdown timeout to setpublic void setTaskExecutor(Executor taskExecutor)
public void setPrefetchCount(int prefetchCount)
the transaction size.prefetchCount - the prefetch countpublic void setTxSize(int txSize)
the prefetch count. Also affects
how often acks are sent when using AcknowledgeMode.AUTO - one ack per txSize. Default is 1.txSize - the transaction sizepublic void setTransactionManager(PlatformTransactionManager transactionManager)
public void setTransactionAttribute(TransactionAttribute transactionAttribute)
transactionAttribute - the transaction attribute to setpublic void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
MessagePropertiesConverter for this listener container.messagePropertiesConverter - The properties converter.public void setDefaultRequeueRejected(boolean defaultRequeueRejected)
AmqpRejectAndDontRequeueException. Default true.defaultRequeueRejected - true to reject by default.protected RabbitAdmin getRabbitAdmin()
public void setRabbitAdmin(RabbitAdmin rabbitAdmin)
RabbitAdmin, used to declare any auto-delete queues, bindings
etc when the container is started. Only needed if those queues use conditional
declaration (have a 'declared-by' attribute). If not specified, an internal
admin will be used which will attempt to declare all elements not having a
'declared-by' attribute.rabbitAdmin - The admin.public void setMissingQueuesFatal(boolean missingQueuesFatal)
When false, the condition is not considered fatal and the container will
continue to attempt to start the consumers according to the setRecoveryInterval(long).
Note that each consumer will make 3 attempts (at 5 second intervals) on each
recovery attempt.
missingQueuesFatal - the missingQueuesFatal to set.public void setQueueNames(String... queueName)
AbstractMessageListenerContainersetQueueNames in class AbstractMessageListenerContainerqueueName - the desired queueName(s) (can not be null)public void setQueues(Queue... queues)
AbstractMessageListenerContainersetQueues in class AbstractMessageListenerContainerqueues - the desired queue(s) (can not be null)public void setAutoDeclare(boolean autoDeclare)
autoDeclare - the boolean flag to indicate an redeclaration operation.redeclareElementsIfNecessary()public void addQueueNames(String... queueName)
addQueueNames in class AbstractMessageListenerContainerqueueName - The queue to add.public void addQueues(Queue... queue)
addQueues in class AbstractMessageListenerContainerqueue - The queue to add.public boolean removeQueueNames(String... queueName)
removeQueueNames in class AbstractMessageListenerContainerqueueName - The queue to remove.queueNames List.public boolean removeQueues(Queue... queue)
removeQueues in class AbstractMessageListenerContainerqueue - The queue to remove.queueNames List.public void setDeclarationRetries(int declarationRetries)
declarationRetries - The number of retries, default 3.setFailedDeclarationRetryInterval(long)public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
failedDeclarationRetryInterval - the interval, default 5000.setDeclarationRetries(int)public void setRetryDeclarationInterval(long retryDeclarationInterval)
retryDeclarationInterval - the interval, default 60000.public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy)
ConsumerTagStrategy to generate consumer tags.
By default, the RabbitMQ server generates consumer tags.consumerTagStrategy - the consumerTagStrategy to set.protected void validateConfiguration()
validateConfiguration in class AbstractMessageListenerContainerprotected final boolean sharedConnectionEnabled()
protected void doInitialize()
throws Exception
doInitialize in class AbstractMessageListenerContainerException - Any Exception.@ManagedMetric(metricType=GAUGE) public int getActiveConsumerCount()
protected void doStart()
throws Exception
doStart in class AbstractMessageListenerContainerException - Any Exception.protected void doStop()
AbstractMessageListenerContainerdoStop in class AbstractMessageListenerContainerprotected void doShutdown()
AbstractMessageListenerContainerSubclasses need to implement this method for their specific invoker management process.
A shared Rabbit Connection, if any, will automatically be closed afterwards.
doShutdown in class AbstractMessageListenerContainerAbstractMessageListenerContainer.shutdown()protected int initializeConsumers()
protected void addAndStartConsumers(int delta)
protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
AbstractMessageListenerContainerNote:This method is about finding out whether the Channel's transaction is local or externally coordinated.
isChannelLocallyTransacted in class AbstractMessageListenerContainerchannel - the Channel to checkRabbitAccessor.isChannelTransacted()protected BlockingQueueConsumer createBlockingQueueConsumer()
protected void invokeListener(com.rabbitmq.client.Channel channel,
Message message)
throws Exception
AbstractMessageListenerContainerinvokeListener in class AbstractMessageListenerContainerchannel - the Rabbit Channel to operate onmessage - the received Rabbit MessageException - if thrown by Rabbit API methodsAbstractMessageListenerContainer.setMessageListener(java.lang.Object)protected void handleStartupFailure(BackOffExecution backOffExecution) throws Exception
recoveryInterval
or setRecoveryBackOff(BackOff) to give the container a
chance to recover from consumer startup failure, e.g. if the broker is down.backOffExecution - the BackOffExecution to get the recoveryIntervalException - if the shared connection still can't be established