Class BlockingQueueConsumer
java.lang.Object
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer
Specialized consumer encapsulating knowledge of the broker
connections and having its own lifecycle (start and stop).
-
Constructor Summary
ConstructorsConstructorDescriptionBlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues) Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean exclusive, String... queues) Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, String... queues) Create a consumer.BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues) Create a consumer. -
Method Summary
Modifier and TypeMethodDescriptionprotected voidprotected voidbasicCancel(boolean expected) protected booleanvoidClear the delivery tags when rolling back with an external transaction manager.booleancommitIfNecessary(boolean localTx) Perform a commit or message acknowledgement, as appropriate.voidorg.springframework.util.backoff.BackOffExecutioncom.rabbitmq.client.Channelprotected booleanbooleanReturn true if cancellation is expected.org.springframework.amqp.core.MessageMain application-side API: wait for the next message delivery and return it.org.springframework.amqp.core.MessagenextMessage(long timeout) Main application-side API: wait for the next message delivery and return it.voidPerform a rollback, handling rollback exceptions properly.voidrollbackOnExceptionIfNecessary(Throwable ex, long tag) Perform a rollback, handling rollback exceptions properly.voidsetApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) voidsetBackOffExecution(org.springframework.util.backoff.BackOffExecution backOffExecution) Set theBackOffExecutionto use for the recovery in theSimpleMessageListenerContainer.voidsetConsumeDelay(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms.voidsetDeclarationRetries(int declarationRetries) Set the number of retries after passive queue declaration fails.voidsetFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.voidsetGlobalQos(boolean globalQos) Apply prefetch to the entire channel.voidsetLocallyTransacted(boolean locallyTransacted) True if the channel is locally transacted.voidsetMessageAckListener(MessageAckListener messageAckListener) Set aMessageAckListenerto use when ack a message(messages) inAcknowledgeMode.AUTOmode.voidsetMissingQueuePublisher(Consumer<String> missingQueuePublisher) Set the publisher for a missing queue event.voidsetRetryDeclarationInterval(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).voidsetShutdownTimeout(long shutdownTimeout) voidsetTagStrategy(org.springframework.amqp.support.ConsumerTagStrategy tagStrategy) Set theConsumerTagStrategyto use when generating consumer tags.voidstart()voidstop()toString()
-
Constructor Details
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started. RequeueRejected defaults to true.- Parameters:
connectionFactory- The connection factory.messagePropertiesConverter- The properties converter.activeObjectCounter- The active object counter; used during shutdown.acknowledgeMode- The acknowledgemode.transactional- Whether the channel is transactional.prefetchCount- The prefetch count.queues- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory- The connection factory.messagePropertiesConverter- The properties converter.activeObjectCounter- The active object counter; used during shutdown.acknowledgeMode- The acknowledge mode.transactional- Whether the channel is transactional.prefetchCount- The prefetch count.defaultRequeueRejected- true to reject requeued messages.queues- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory- The connection factory.messagePropertiesConverter- The properties converter.activeObjectCounter- The active object counter; used during shutdown.acknowledgeMode- The acknowledge mode.transactional- Whether the channel is transactional.prefetchCount- The prefetch count.defaultRequeueRejected- true to reject requeued messages.consumerArgs- The consumer arguments (e.g. x-priority).queues- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, boolean exclusive, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory- The connection factory.messagePropertiesConverter- The properties converter.activeObjectCounter- The active object counter; used during shutdown.acknowledgeMode- The acknowledge mode.transactional- Whether the channel is transactional.prefetchCount- The prefetch count.defaultRequeueRejected- true to reject requeued messages.consumerArgs- The consumer arguments (e.g. x-priority).exclusive- true if the consumer is to be exclusive.queues- The queues.
-
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, org.springframework.amqp.core.AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker until it is started.- Parameters:
connectionFactory- The connection factory.messagePropertiesConverter- The properties converter.activeObjectCounter- The active object counter; used during shutdown.acknowledgeMode- The acknowledge mode.transactional- Whether the channel is transactional.prefetchCount- The prefetch count.defaultRequeueRejected- true to reject requeued messages.consumerArgs- The consumer arguments (e.g. x-priority).noLocal- true if the consumer is to be no-local.exclusive- true if the consumer is to be exclusive.queues- The queues.- Since:
- 1.7.4
-
-
Method Details
-
getChannel
public com.rabbitmq.client.Channel getChannel() -
getConsumerTags
-
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout) -
setDeclarationRetries
public void setDeclarationRetries(int declarationRetries) Set the number of retries after passive queue declaration fails.- Parameters:
declarationRetries- The number of retries, default 3.- Since:
- 1.3.9
- See Also:
-
setFailedDeclarationRetryInterval
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) Set the interval between passive queue declaration attempts in milliseconds.- Parameters:
failedDeclarationRetryInterval- the interval, default 5000.- Since:
- 1.3.9
- See Also:
-
setRetryDeclarationInterval
public void setRetryDeclarationInterval(long retryDeclarationInterval) When consuming multiple queues, set the interval between declaration attempts when only a subset of the queues were available (milliseconds).- Parameters:
retryDeclarationInterval- the interval, default 60000.- Since:
- 1.3.9
-
setTagStrategy
public void setTagStrategy(org.springframework.amqp.support.ConsumerTagStrategy tagStrategy) Set theConsumerTagStrategyto use when generating consumer tags.- Parameters:
tagStrategy- the tagStrategy to set- Since:
- 1.4.5
-
setBackOffExecution
public void setBackOffExecution(org.springframework.util.backoff.BackOffExecution backOffExecution) Set theBackOffExecutionto use for the recovery in theSimpleMessageListenerContainer.- Parameters:
backOffExecution- the backOffExecution.- Since:
- 1.5
-
getBackOffExecution
public org.springframework.util.backoff.BackOffExecution getBackOffExecution() -
setLocallyTransacted
public void setLocallyTransacted(boolean locallyTransacted) True if the channel is locally transacted.- Parameters:
locallyTransacted- the locally transacted to set.- Since:
- 1.6.6
-
setApplicationEventPublisher
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) -
setMissingQueuePublisher
Set the publisher for a missing queue event.- Parameters:
missingQueuePublisher- the publisher.- Since:
- 2.1.18
-
setConsumeDelay
public void setConsumeDelay(long consumeDelay) Set the consumeDelay - a time to wait before consuming in ms. This is useful when using the sharding plugin withconcurrency > 1, to avoid uneven distribution of consumers across the shards. See the plugin README for more information.- Parameters:
consumeDelay- the consume delay.- Since:
- 2.3
-
setMessageAckListener
Set aMessageAckListenerto use when ack a message(messages) inAcknowledgeMode.AUTOmode.- Parameters:
messageAckListener- the messageAckListener.- Since:
- 2.4.6
-
clearDeliveryTags
public void clearDeliveryTags()Clear the delivery tags when rolling back with an external transaction manager.- Since:
- 1.6.6
-
setGlobalQos
public void setGlobalQos(boolean globalQos) Apply prefetch to the entire channel.- Parameters:
globalQos- true for a channel-wide prefetch.- Since:
- 2.2.17
- See Also:
-
Channel.basicQos(int, boolean)
-
isNormalCancel
public boolean isNormalCancel()Return true if cancellation is expected.- Returns:
- true if expected.
-
basicCancel
protected void basicCancel() -
basicCancel
protected void basicCancel(boolean expected) -
hasDelivery
protected boolean hasDelivery() -
cancelled
protected boolean cancelled() -
nextMessage
@Nullable public org.springframework.amqp.core.Message nextMessage() throws InterruptedException, com.rabbitmq.client.ShutdownSignalExceptionMain application-side API: wait for the next message delivery and return it.- Returns:
- the next message
- Throws:
InterruptedException- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException- if the connection is shut down while waiting
-
nextMessage
@Nullable public org.springframework.amqp.core.Message nextMessage(long timeout) throws InterruptedException, com.rabbitmq.client.ShutdownSignalException Main application-side API: wait for the next message delivery and return it.- Parameters:
timeout- timeout in millisecond- Returns:
- the next message or null if timed out
- Throws:
InterruptedException- if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException- if the connection is shut down while waiting
-
start
public void start() throws org.springframework.amqp.AmqpException- Throws:
org.springframework.amqp.AmqpException
-
stop
public void stop() -
forceCloseAndClearQueue
public void forceCloseAndClearQueue() -
rollbackOnExceptionIfNecessary
Perform a rollback, handling rollback exceptions properly.- Parameters:
ex- the thrown application exception or error
-
rollbackOnExceptionIfNecessary
Perform a rollback, handling rollback exceptions properly.- Parameters:
ex- the thrown application exception or errortag- delivery tag; when specified (greater than or equal to 0) only that message is nacked.- Since:
- 2.2.21.
-
commitIfNecessary
Perform a commit or message acknowledgement, as appropriate.- Parameters:
localTx- Whether the channel is locally transacted.- Returns:
- true if at least one delivery tag exists.
- Throws:
IOException- Any IOException.
-
toString
-