Spring AMQP

org.springframework.amqp.rabbit.listener
Class SimpleMessageListenerContainer

java.lang.Object
  extended by org.springframework.amqp.rabbit.connection.RabbitAccessor
      extended by org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
          extended by org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
All Implemented Interfaces:
BeanNameAware, DisposableBean, InitializingBean, Lifecycle, Phased, SmartLifecycle

public class SimpleMessageListenerContainer
extends AbstractMessageListenerContainer

Since:
1.0
Author:
Mark Pollack, Mark Fisher, Dave Syer

Nested Class Summary
static interface SimpleMessageListenerContainer.ContainerDelegate
           
 
Nested classes/interfaces inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
 
Field Summary
static int DEFAULT_PREFETCH_COUNT
           
static long DEFAULT_RECEIVE_TIMEOUT
           
static long DEFAULT_RECOVERY_INTERVAL
          The default recovery interval: 5000 ms = 5 seconds.
static long DEFAULT_SHUTDOWN_TIMEOUT
           
 
Fields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
logger
 
Constructor Summary
SimpleMessageListenerContainer()
          Default constructor for convenient dependency injection via setters.
SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
          Create a listener container from the connection factory (mandatory).
 
Method Summary
protected  BlockingQueueConsumer createBlockingQueueConsumer()
           
protected  void doInitialize()
          Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated MessageConsumer.
protected  void doShutdown()
          Close the registered invokers.
protected  void doStart()
          Re-initializes this container's Rabbit message consumers, if not initialized already.
protected  void doStop()
          This method is invoked when the container is stopping.
 int getActiveConsumerCount()
           
protected  void handleStartupFailure(Throwable t)
          Wait for a period determined by the recoveryInterval to give the container a chance to recover from consumer startup failure, e.g. if the broker is down.
protected  void initializeConsumers()
           
protected  void invokeListener(com.rabbitmq.client.Channel channel, Message message)
          Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
protected  boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
          Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.
 void setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)
           Public setter for the Advice to apply to listener executions.
 void setConcurrentConsumers(int concurrentConsumers)
          Specify the number of concurrent consumers to create.
 void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
          Set the MessagePropertiesConverter for this listener container.
 void setPrefetchCount(int prefetchCount)
          Tells the broker how many messages to send to each consumer in a single request.
 void setReceiveTimeout(long receiveTimeout)
           
 void setRecoveryInterval(long recoveryInterval)
          Specify the interval between recovery attempts, in milliseconds.
 void setShutdownTimeout(long shutdownTimeout)
          The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced closed.
 void setTaskExecutor(Executor taskExecutor)
           
 void setTransactionAttribute(TransactionAttribute transactionAttribute)
           
 void setTransactionManager(PlatformTransactionManager transactionManager)
           
 void setTxSize(int txSize)
          Tells the container how many messages to process in a single transaction (if the channel is transactional).
protected  boolean sharedConnectionEnabled()
          Always use a shared Rabbit Connection.
protected  void validateConfiguration()
          Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.
 
Methods inherited from class org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
afterPropertiesSet, checkMessageListener, destroy, doInvokeListener, doInvokeListener, executeListener, getAcknowledgeMode, getBeanName, getMessageListener, getPhase, getQueueNames, getRequiredQueueNames, handleListenerException, initialize, invokeErrorHandler, isActive, isAutoStartup, isExposeListenerChannel, isRunning, setAcknowledgeMode, setAutoStartup, setBeanName, setErrorHandler, setExposeListenerChannel, setMessageListener, setPhase, setQueueNames, setQueues, shutdown, start, stop, stop, wrapToListenerExecutionFailedExceptionIfNeeded
 
Methods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getConnectionFactory, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_RECEIVE_TIMEOUT

public static final long DEFAULT_RECEIVE_TIMEOUT
See Also:
Constant Field Values

DEFAULT_PREFETCH_COUNT

public static final int DEFAULT_PREFETCH_COUNT
See Also:
Constant Field Values

DEFAULT_SHUTDOWN_TIMEOUT

public static final long DEFAULT_SHUTDOWN_TIMEOUT
See Also:
Constant Field Values

DEFAULT_RECOVERY_INTERVAL

public static final long DEFAULT_RECOVERY_INTERVAL
The default recovery interval: 5000 ms = 5 seconds.

See Also:
Constant Field Values
Constructor Detail

SimpleMessageListenerContainer

public SimpleMessageListenerContainer()
Default constructor for convenient dependency injection via setters.


SimpleMessageListenerContainer

public SimpleMessageListenerContainer(ConnectionFactory connectionFactory)
Create a listener container from the connection factory (mandatory).

Parameters:
connectionFactory - the ConnectionFactory
Method Detail

setAdviceChain

public void setAdviceChain(org.aopalliance.aop.Advice[] adviceChain)

Public setter for the Advice to apply to listener executions. If txSize>1 then multiple listener executions will all be wrapped in the same advice up to that limit.

If a transactionManager is provided as well, then separate advice is created for the transaction and applied first in the chain. In that case the advice chain provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).

Parameters:
adviceChain - the advice chain to set

setRecoveryInterval

public void setRecoveryInterval(long recoveryInterval)
Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.


setConcurrentConsumers

public void setConcurrentConsumers(int concurrentConsumers)
Specify the number of concurrent consumers to create. Default is 1.

Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.


setReceiveTimeout

public void setReceiveTimeout(long receiveTimeout)

setShutdownTimeout

public void setShutdownTimeout(long shutdownTimeout)
The time to wait for workers in milliseconds after the container is stopped, and before the connection is forced closed. If any workers are active when the shutdown signal comes they will be allowed to finish processing as long as they can finish within this timeout. Otherwise the connection is closed and messages remain unacked (if the channel is transactional). Defaults to 5 seconds.

Parameters:
shutdownTimeout - the shutdown timeout to set

setTaskExecutor

public void setTaskExecutor(Executor taskExecutor)

setPrefetchCount

public void setPrefetchCount(int prefetchCount)
Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high to improve throughput. It should be greater than or equal to the transaction size.

Parameters:
prefetchCount - the prefetch count

setTxSize

public void setTxSize(int txSize)
Tells the container how many messages to process in a single transaction (if the channel is transactional). For best results it should be less than or equal to the prefetch count.

Parameters:
txSize - the transaction size

setTransactionManager

public void setTransactionManager(PlatformTransactionManager transactionManager)

setTransactionAttribute

public void setTransactionAttribute(TransactionAttribute transactionAttribute)
Parameters:
transactionAttribute - the transaction attribute to set

setMessagePropertiesConverter

public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter)
Set the MessagePropertiesConverter for this listener container.


validateConfiguration

protected void validateConfiguration()
Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent consumers.

Overrides:
validateConfiguration in class AbstractMessageListenerContainer

sharedConnectionEnabled

protected final boolean sharedConnectionEnabled()
Always use a shared Rabbit Connection.


doInitialize

protected void doInitialize()
                     throws Exception
Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated MessageConsumer.

Specified by:
doInitialize in class AbstractMessageListenerContainer
Throws:
Exception
AbstractMessageListenerContainer.SharedConnectionNotInitializedException

getActiveConsumerCount

@ManagedMetric(metricType=GAUGE)
public int getActiveConsumerCount()

doStart

protected void doStart()
                throws Exception
Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer to this container's task executor.

Overrides:
doStart in class AbstractMessageListenerContainer
Throws:
Exception
AbstractMessageListenerContainer.SharedConnectionNotInitializedException - if thrown by Rabbit API methods

doStop

protected void doStop()
Description copied from class: AbstractMessageListenerContainer
This method is invoked when the container is stopping. The default implementation does nothing, but subclasses may override.

Overrides:
doStop in class AbstractMessageListenerContainer

doShutdown

protected void doShutdown()
Description copied from class: AbstractMessageListenerContainer
Close the registered invokers.

Subclasses need to implement this method for their specific invoker management process.

A shared Rabbit Connection, if any, will automatically be closed afterwards.

Specified by:
doShutdown in class AbstractMessageListenerContainer
See Also:
AbstractMessageListenerContainer.shutdown()

initializeConsumers

protected void initializeConsumers()

isChannelLocallyTransacted

protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Description copied from class: AbstractMessageListenerContainer
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.

Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.

Overrides:
isChannelLocallyTransacted in class AbstractMessageListenerContainer
Parameters:
channel - the Channel to check
Returns:
whether the given Channel is locally transacted
See Also:
RabbitAccessor.isChannelTransacted()

createBlockingQueueConsumer

protected BlockingQueueConsumer createBlockingQueueConsumer()

invokeListener

protected void invokeListener(com.rabbitmq.client.Channel channel,
                              Message message)
                       throws Exception
Description copied from class: AbstractMessageListenerContainer
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.

Overrides:
invokeListener in class AbstractMessageListenerContainer
Parameters:
channel - the Rabbit Channel to operate on
message - the received Rabbit Message
Throws:
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Exception
See Also:
AbstractMessageListenerContainer.setMessageListener(java.lang.Object)

handleStartupFailure

protected void handleStartupFailure(Throwable t)
                             throws Exception
Wait for a period determined by the recoveryInterval to give the container a chance to recover from consumer startup failure, e.g. if the broker is down.

Parameters:
t - the exception that stopped the startup
Throws:
Exception - if the shared connection still can't be established

Spring AMQP

Copyright © 2011. All Rights Reserved.