Spring AMQP

org.springframework.amqp.rabbit.listener
Class SimpleMessageListenerContainer

java.lang.Object
  extended by org.springframework.amqp.rabbit.support.RabbitAccessor
      extended by org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
          extended by org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
All Implemented Interfaces:
BeanNameAware, DisposableBean, InitializingBean, Lifecycle, Phased, SmartLifecycle

public class SimpleMessageListenerContainer
extends AbstractMessageListenerContainer

Author:
Mark Pollack, Mark Fisher, Dave Syer

Nested Class Summary
static interface SimpleMessageListenerContainer.ContainerDelegate
           
 
Nested classes/interfaces inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
 
Field Summary
static long DEFAULT_RECEIVE_TIMEOUT
           
 
Fields inherited from class org.springframework.amqp.rabbit.support.RabbitAccessor
logger
 
Constructor Summary
SimpleMessageListenerContainer()
           
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
           
 
Method Summary
protected  BlockingQueueConsumer createBlockingQueueConsumer(com.rabbitmq.client.Channel channel)
           
protected  void doInitialize()
          Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated MessageConsumer.
protected  void doShutdown()
          Close the registered invokers.
protected  void doStart()
          Re-initializes this container's Rabbit message consumers, if not initialized already.
protected  void doStop()
          Notify all invoker tasks and stop the shared Connection, if any.
 int getActiveConsumerCount()
           
protected  void initializeConsumers()
           
 void initializeProxy()
           
protected  boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
          Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.
 void setAdviceChain(org.aopalliance.aop.Advice[] advices)
           Public setter for the Advice to apply to listener executions.
 void setConcurrentConsumers(int concurrentConsumers)
          Specify the number of concurrent consumers to create.
 void setPrefetchCount(int prefetchCount)
          Tells the broker how many messages to send to each consumer in a single request.
 void setReceiveTimeout(long receiveTimeout)
           
 void setShutdownTimeout(long shutdownTimeout)
          The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced closed.
 void setTaskExecutor(Executor taskExecutor)
           
 void setTransactionAttribute(TransactionAttribute transactionAttribute)
           
 void setTransactionManager(PlatformTransactionManager transactionManager)
           
 void setTxSize(int txSize)
          Tells the container how many messages to process in a single transaction (if the channel is transactional).
protected  boolean sharedConnectionEnabled()
          Always use a shared Rabbit Connection.
protected  void validateConfiguration()
          Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.
 
Methods inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
afterPropertiesSet, checkMessageListener, commitIfNecessary, createSharedConnection, destroy, doExecuteListener, doInvokeListener, doInvokeListener, establishSharedConnection, executeListener, getAcknowledgeMode, getBeanName, getMessageListener, getPhase, getQueueName, getRequiredQueueName, getSharedConnection, handleListenerException, initialize, invokeErrorHandler, invokeListener, isActive, isAutoStartup, isExposeListenerChannel, isRunning, prepareSharedConnection, refreshSharedConnection, rollbackIfNecessary, rollbackOnExceptionIfNecessary, setAcknowledgeMode, setAutoStartup, setBeanName, setErrorHandler, setExposeListenerChannel, setMessageListener, setPhase, setQueueName, setQueues, shutdown, start, stop, stop, stopSharedConnection, wrapToListenerExecutionFailedExceptionIfNeeded
 
Methods inherited from class org.springframework.amqp.rabbit.support.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getConnectionFactory, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_RECEIVE_TIMEOUT

public static final long DEFAULT_RECEIVE_TIMEOUT
See Also:
Constant Field Values
Constructor Detail

SimpleMessageListenerContainer

public SimpleMessageListenerContainer()

SimpleMessageListenerContainer

public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Method Detail

setAdviceChain

public void setAdviceChain(org.aopalliance.aop.Advice[] advices)

Public setter for the Advice to apply to listener executions. If txSize>1 then multiple listener executions will all be wrapped in the same advice up to that limit.

If a transactionManager is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).

Parameters:
advices - the advice chain to set

setConcurrentConsumers

public void setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create. Default is 1.

Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.


setReceiveTimeout

public void setReceiveTimeout(long receiveTimeout)

setShutdownTimeout

public void setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced closed. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Otherwise the connection is closed and messages remain unacked (if the channel is transactional). Defaults to 5 seconds.

Parameters:
shutdownTimeout - the shutdown timeout to set

setTaskExecutor

public void setTaskExecutor(Executor taskExecutor)

setPrefetchCount

public void setPrefetchCount(int prefetchCount)
Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. It should be greater than or equal to the transaction size.

Parameters:
prefetchCount - the prefetch count

setTxSize

public void setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the channel is transactional). For best results it should be less than or equal to the prefetch count.

Parameters:
prefetchCount - the prefetch count

setTransactionManager

public void setTransactionManager(PlatformTransactionManager transactionManager)

setTransactionAttribute

public void setTransactionAttribute(TransactionAttribute transactionAttribute)
Parameters:
transactionAttribute - the transaction attribute to set

validateConfiguration

protected void validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.

Overrides:
validateConfiguration in class AbstractMessageListenerContainer

initializeProxy

public void initializeProxy()

sharedConnectionEnabled

protected final boolean sharedConnectionEnabled()
Always use a shared Rabbit Connection.

Specified by:
sharedConnectionEnabled in class AbstractMessageListenerContainer
See Also:
AbstractMessageListenerContainer.getSharedConnection()

doInitialize

protected void doInitialize()
                     throws Exception
Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated MessageConsumer.

Specified by:
doInitialize in class AbstractMessageListenerContainer
Throws:
Exception
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
See Also:
AbstractMessageListenerContainer.getSharedConnection()

getActiveConsumerCount

public int getActiveConsumerCount()

doStart

protected void doStart()
                throws Exception
Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer to this container's task executor.

Overrides:
doStart in class AbstractMessageListenerContainer
Throws:
Exception
AbstractMessageListenerContainer.SharedConnectionNotInitializedException - if thrown by Rabbit API methods
See Also:
AbstractMessageListenerContainer.establishSharedConnection()

doStop

protected void doStop()
Description copied from class: AbstractMessageListenerContainer
Notify all invoker tasks and stop the shared Connection, if any.

Overrides:
doStop in class AbstractMessageListenerContainer
See Also:
AbstractMessageListenerContainer.stopSharedConnection()

doShutdown

protected void doShutdown()
Description copied from class: AbstractMessageListenerContainer
Close the registered invokers.

Subclasses need to implement this method for their specific invoker management process.

A shared Rabbit Connection, if any, will automatically be closed afterwards.

Specified by:
doShutdown in class AbstractMessageListenerContainer
See Also:
AbstractMessageListenerContainer.shutdown()

initializeConsumers

protected void initializeConsumers()
                            throws IOException
Throws:
IOException

isChannelLocallyTransacted

protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Description copied from class: AbstractMessageListenerContainer
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.

Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.

Overrides:
isChannelLocallyTransacted in class AbstractMessageListenerContainer
Parameters:
channel - the Channel to check
Returns:
whether the given Channel is locally transacted
See Also:
RabbitAccessor.isChannelTransacted()

createBlockingQueueConsumer

protected BlockingQueueConsumer createBlockingQueueConsumer(com.rabbitmq.client.Channel channel)

Spring AMQP

Copyright © 2011. All Rights Reserved.