Spring AMQP

org.springframework.amqp.rabbit.listener
Class AbstractMessageListenerContainer

java.lang.Object
  extended by org.springframework.amqp.rabbit.connection.RabbitAccessor
      extended by org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
All Implemented Interfaces:
BeanNameAware, DisposableBean, InitializingBean, Lifecycle, Phased, SmartLifecycle
Direct Known Subclasses:
SimpleMessageListenerContainer

public abstract class AbstractMessageListenerContainer
extends RabbitAccessor
implements BeanNameAware, DisposableBean, SmartLifecycle

Author:
Mark Pollack, Mark Fisher, Dave Syer

Nested Class Summary
static class AbstractMessageListenerContainer.SharedConnectionNotInitializedException
          Exception that indicates that the initial setup of this container's shared Rabbit Connection failed.
 
Field Summary
 
Fields inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
logger
 
Constructor Summary
AbstractMessageListenerContainer()
           
 
Method Summary
 void afterPropertiesSet()
          Delegates to validateConfiguration() and initialize().
protected  void checkMessageListener(Object messageListener)
          Check the given message listener, throwing an exception if it does not correspond to a supported listener type.
 void destroy()
          Calls shutdown() when the BeanFactory destroys the container instance.
protected abstract  void doInitialize()
          Register any invokers within this container.
protected  void doInvokeListener(ChannelAwareMessageListener listener, com.rabbitmq.client.Channel channel, Message message)
          Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded.
protected  void doInvokeListener(MessageListener listener, Message message)
          Invoke the specified listener as Spring Rabbit MessageListener.
protected abstract  void doShutdown()
          Close the registered invokers.
protected  void doStart()
          Start this container, and notify all invoker tasks.
protected  void doStop()
          This method is invoked when the container is stopping.
protected  void executeListener(com.rabbitmq.client.Channel channel, Message message)
          Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
 AcknowledgeMode getAcknowledgeMode()
           
protected  String getBeanName()
          Return the bean name that this listener container has been assigned in its containing bean factory, if any.
 Object getMessageListener()
          Return the message listener object to register.
 int getPhase()
          Return the phase in which this container will be started and stopped.
 String[] getQueueNames()
          Return the name of the queue to receive messages from.
protected  String[] getRequiredQueueNames()
           
protected  void handleListenerException(Throwable ex)
          Handle the given exception that arose during listener execution.
 void initialize()
          Initialize this container.
protected  void invokeErrorHandler(Throwable ex)
          Invoke the registered ErrorHandler, if any.
protected  void invokeListener(com.rabbitmq.client.Channel channel, Message message)
          Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
 boolean isActive()
          Return whether this container is currently active, that is, whether it has been set up but not shut down yet.
 boolean isAutoStartup()
           
protected  boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
          Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.
 boolean isExposeListenerChannel()
          Return whether to expose the listener Channel to a registered ChannelAwareMessageListener.
 boolean isRunning()
          Determine whether this container is currently running, that is, whether it has been started and not stopped yet.
 void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)
           Flag controlling the behaviour of the container with respect to message acknowledgement.
 void setAutoStartup(boolean autoStartup)
          Set whether to automatically start the container after initialization.
 void setBeanName(String beanName)
           
 void setErrorHandler(ErrorHandler errorHandler)
          Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message.
 void setExposeListenerChannel(boolean exposeListenerChannel)
          Set whether to expose the listener Rabbit Channel to a registered ChannelAwareMessageListener as well as to RabbitTemplate calls.
 void setMessageListener(Object messageListener)
          Set the message listener implementation to register.
 void setPhase(int phase)
          Specify the phase in which this container should be started and stopped.
 void setQueueNames(String... queueName)
          Set the name of the queue to receive messages from.
 void setQueues(Queue... queues)
           
 void shutdown()
          Stop the shared Connection, call doShutdown(), and close this container.
 void start()
          Start this container.
 void stop()
          Stop this container.
 void stop(Runnable callback)
           
protected  void validateConfiguration()
          Validate the configuration of this container.
protected  Exception wrapToListenerExecutionFailedExceptionIfNeeded(Exception e)
           
 
Methods inherited from class org.springframework.amqp.rabbit.connection.RabbitAccessor
convertRabbitAccessException, createConnection, getChannel, getConnection, getConnectionFactory, getTransactionalResourceHolder, isChannelTransacted, setChannelTransacted, setConnectionFactory
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

AbstractMessageListenerContainer

public AbstractMessageListenerContainer()
Method Detail

setAcknowledgeMode

public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode)

Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements (so the listener doesn't need to know about the channel or the message).

Set to AcknowledgeMode.MANUAL if the listener will send the acknowledgements itself using Channel.basicAck(long, boolean). Manual acks are consistent with either a transactional or non-transactional channel, but if you are doing no other work on the channel at the same other than receiving a single message then the transaction is probably unnecessary.

Set to AcknowledgeMode.NONE to tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). If AcknowledgeMode.NONE then the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).

Parameters:
acknowledgeMode - the acknowledge mode to set. Defaults to AcknowledgeMode.AUTO
See Also:
AcknowledgeMode

getAcknowledgeMode

public AcknowledgeMode getAcknowledgeMode()
Returns:
the acknowledgeMode

setQueueNames

public void setQueueNames(String... queueName)
Set the name of the queue to receive messages from.

Parameters:
queueName - the desired queue (can not be null)

setQueues

public void setQueues(Queue... queues)

getQueueNames

public String[] getQueueNames()
Return the name of the queue to receive messages from.


getRequiredQueueNames

protected String[] getRequiredQueueNames()

isExposeListenerChannel

public boolean isExposeListenerChannel()
Return whether to expose the listener Channel to a registered ChannelAwareMessageListener.


setExposeListenerChannel

public void setExposeListenerChannel(boolean exposeListenerChannel)
Set whether to expose the listener Rabbit Channel to a registered ChannelAwareMessageListener as well as to RabbitTemplate calls.

Default is "true", reusing the listener's Channel. Turn this off to expose a fresh Rabbit Channel fetched from the same underlying Rabbit Connection instead.

Note that Channels managed by an external transaction manager will always get exposed to RabbitTemplate calls. So in terms of RabbitTemplate exposure, this setting only affects locally transacted Channels.

See Also:
ChannelAwareMessageListener

setMessageListener

public void setMessageListener(Object messageListener)
Set the message listener implementation to register. This can be either a Spring MessageListener object or a Spring ChannelAwareMessageListener object.

Throws:
IllegalArgumentException - if the supplied listener is not a MessageListener or a ChannelAwareMessageListener
See Also:
MessageListener, ChannelAwareMessageListener

checkMessageListener

protected void checkMessageListener(Object messageListener)
Check the given message listener, throwing an exception if it does not correspond to a supported listener type.

By default, only a Spring MessageListener object or a Spring SessionAwareMessageListener object will be accepted.

Parameters:
messageListener - the message listener object to check
Throws:
IllegalArgumentException - if the supplied listener is not a MessageListener or SessionAwareMessageListener
See Also:
MessageListener, ChannelAwareMessageListener

getMessageListener

public Object getMessageListener()
Return the message listener object to register.


setErrorHandler

public void setErrorHandler(ErrorHandler errorHandler)
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default there will be no ErrorHandler so that error-level logging is the only result.


setAutoStartup

public void setAutoStartup(boolean autoStartup)
Set whether to automatically start the container after initialization.

Default is "true"; set this to "false" to allow for manual startup through the start() method.


isAutoStartup

public boolean isAutoStartup()
Specified by:
isAutoStartup in interface SmartLifecycle

setPhase

public void setPhase(int phase)
Specify the phase in which this container should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible.


getPhase

public int getPhase()
Return the phase in which this container will be started and stopped.

Specified by:
getPhase in interface Phased

setBeanName

public void setBeanName(String beanName)
Specified by:
setBeanName in interface BeanNameAware

getBeanName

protected final String getBeanName()
Return the bean name that this listener container has been assigned in its containing bean factory, if any.


afterPropertiesSet

public final void afterPropertiesSet()
Delegates to validateConfiguration() and initialize().

Specified by:
afterPropertiesSet in interface InitializingBean
Overrides:
afterPropertiesSet in class RabbitAccessor

validateConfiguration

protected void validateConfiguration()
Validate the configuration of this container.

The default implementation is empty. To be overridden in subclasses.


destroy

public void destroy()
Calls shutdown() when the BeanFactory destroys the container instance.

Specified by:
destroy in interface DisposableBean
See Also:
shutdown()

initialize

public void initialize()
Initialize this container.

Creates a Rabbit Connection and calls doInitialize().


shutdown

public void shutdown()
Stop the shared Connection, call doShutdown(), and close this container.


doInitialize

protected abstract void doInitialize()
                              throws Exception
Register any invokers within this container.

Subclasses need to implement this method for their specific invoker management process.

Throws:
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
Exception

doShutdown

protected abstract void doShutdown()
Close the registered invokers.

Subclasses need to implement this method for their specific invoker management process.

A shared Rabbit Connection, if any, will automatically be closed afterwards.

See Also:
shutdown()

isActive

public final boolean isActive()
Return whether this container is currently active, that is, whether it has been set up but not shut down yet.


start

public void start()
Start this container.

Specified by:
start in interface Lifecycle
See Also:
doStart()

doStart

protected void doStart()
                throws Exception
Start this container, and notify all invoker tasks.

Throws:
AbstractMessageListenerContainer.SharedConnectionNotInitializedException - if thrown by Rabbit API methods
Exception

stop

public void stop()
Stop this container.

Specified by:
stop in interface Lifecycle
See Also:
doStop()

stop

public void stop(Runnable callback)
Specified by:
stop in interface SmartLifecycle

doStop

protected void doStop()
This method is invoked when the container is stopping. The default implementation does nothing, but subclasses may override.


isRunning

public final boolean isRunning()
Determine whether this container is currently running, that is, whether it has been started and not stopped yet.

Specified by:
isRunning in interface Lifecycle
See Also:
start(), stop()

invokeErrorHandler

protected void invokeErrorHandler(Throwable ex)
Invoke the registered ErrorHandler, if any. Log at error level otherwise.

Parameters:
ex - the uncaught error that arose during Rabbit processing.
See Also:
setErrorHandler(org.springframework.util.ErrorHandler)

executeListener

protected void executeListener(com.rabbitmq.client.Channel channel,
                               Message message)
                        throws Throwable
Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).

Parameters:
channel - the Rabbit Channel to operate on
message - the received Rabbit Message
Throws:
Throwable
See Also:
invokeListener(com.rabbitmq.client.Channel, org.springframework.amqp.core.Message), handleListenerException(java.lang.Throwable)

invokeListener

protected void invokeListener(com.rabbitmq.client.Channel channel,
                              Message message)
                       throws Exception
Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.

Parameters:
channel - the Rabbit Channel to operate on
message - the received Rabbit Message
Throws:
AbstractMessageListenerContainer.SharedConnectionNotInitializedException
AbstractMessageListenerContainer.SharedConnectionNotInitializedException - if thrown by Rabbit API methods
Exception
See Also:
setMessageListener(java.lang.Object)

doInvokeListener

protected void doInvokeListener(ChannelAwareMessageListener listener,
                                com.rabbitmq.client.Channel channel,
                                Message message)
                         throws Exception
Invoke the specified listener as Spring ChannelAwareMessageListener, exposing a new Rabbit Session (potentially with its own transaction) to the listener if demanded.

Parameters:
listener - the Spring ChannelAwareMessageListener to invoke
channel - the Rabbit Channel to operate on
message - the received Rabbit Message
Throws:
AbstractMessageListenerContainer.SharedConnectionNotInitializedException - if thrown by Rabbit API methods or listener itself.

Exception thrown from listener will be wrapped to ListenerExecutionFailedException.

Exception
See Also:
ChannelAwareMessageListener, setExposeListenerChannel(boolean)

doInvokeListener

protected void doInvokeListener(MessageListener listener,
                                Message message)
                         throws Exception
Invoke the specified listener as Spring Rabbit MessageListener.

Default implementation performs a plain invocation of the onMessage method.

Exception thrown from listener will be wrapped to ListenerExecutionFailedException.

Parameters:
listener - the Rabbit MessageListener to invoke
message - the received Rabbit Message
Throws:
Exception
See Also:
MessageListener.onMessage(org.springframework.amqp.core.Message)

isChannelLocallyTransacted

protected boolean isChannelLocallyTransacted(com.rabbitmq.client.Channel channel)
Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this listener container's Channel handling and not by an external transaction coordinator.

Note:This method is about finding out whether the Channel's transaction is local or externally coordinated.

Parameters:
channel - the Channel to check
Returns:
whether the given Channel is locally transacted
See Also:
RabbitAccessor.isChannelTransacted()

handleListenerException

protected void handleListenerException(Throwable ex)
Handle the given exception that arose during listener execution.

The default implementation logs the exception at error level, not propagating it to the Rabbit provider - assuming that all handling of acknowledgment and/or transactions is done by this listener container. This can be overridden in subclasses.

Parameters:
ex - the exception to handle

wrapToListenerExecutionFailedExceptionIfNeeded

protected Exception wrapToListenerExecutionFailedExceptionIfNeeded(Exception e)
Parameters:
e -
Returns:
If 'e' is of type ListenerExecutionFailedException - return 'e' as it is, otherwise wrap it to ListenerExecutionFailedException and return.

Spring AMQP

Copyright © 2011. All Rights Reserved.