public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageHandler implements MessageProducer, DisposableBean, IntegrationEvaluationContextAware, ApplicationEventPublisherAware
MessageStore. This class takes care of correlated groups of messages
that can be completed in batches. It is useful for custom implementation of MessageHandlers that require correlation
and is used as a base class for Aggregator - AggregatingMessageHandler and
Resequencer - ResequencingMessageHandler,
or custom implementations requiring correlation.
To customize this handler inject CorrelationStrategy,
ReleaseStrategy, and MessageGroupProcessor implementations as
you require.
By default the CorrelationStrategy will be a
HeaderAttributeCorrelationStrategy and the ReleaseStrategy will be a
SequenceSizeReleaseStrategy.
| Modifier and Type | Class and Description |
|---|---|
protected static class |
AbstractCorrelatingMessageHandler.SequenceAwareMessageGroup |
| Modifier and Type | Field and Description |
|---|---|
static long |
DEFAULT_SEND_TIMEOUT |
protected MessageGroupStore |
messageStore |
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE| Constructor and Description |
|---|
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) |
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store) |
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store,
CorrelationStrategy correlationStrategy,
ReleaseStrategy releaseStrategy) |
| Modifier and Type | Method and Description |
|---|---|
protected abstract void |
afterRelease(MessageGroup group,
java.util.Collection<Message<?>> completedMessages)
Allows you to provide additional logic that needs to be performed after the MessageGroup was released.
|
protected java.util.Collection<Message<?>> |
completeGroup(Message<?> message,
java.lang.Object correlationKey,
MessageGroup group) |
protected void |
completeGroup(java.lang.Object correlationKey,
MessageGroup group) |
void |
destroy() |
protected void |
expireGroup(java.lang.Object correlationKey,
MessageGroup group) |
protected int |
findLastReleasedSequenceNumber(java.lang.Object groupId,
java.util.Collection<Message<?>> partialSequence) |
java.lang.String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected CorrelationStrategy |
getCorrelationStrategy() |
protected MessageChannel |
getDiscardChannel() |
protected java.lang.String |
getDiscardChannelName() |
protected EvaluationContext |
getEvaluationContext() |
protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>> |
getExpireGroupScheduledFutures() |
protected Expression |
getGroupTimeoutExpression() |
protected LockRegistry |
getLockRegistry() |
protected MessageGroupStore |
getMessageStore() |
protected MessagingTemplate |
getMessagingTemplate() |
protected long |
getMinimumTimeoutForEmptyGroups() |
protected MessageChannel |
getOutputChannel() |
protected java.lang.String |
getOutputChannelName() |
protected MessageGroupProcessor |
getOutputProcessor() |
protected ReleaseStrategy |
getReleaseStrategy() |
protected void |
handleMessageInternal(Message<?> message) |
protected boolean |
isLockRegistrySet() |
protected boolean |
isReleasePartialSequences() |
protected boolean |
isSendPartialResultOnExpiry() |
protected boolean |
isSequenceAware() |
protected java.lang.Long |
obtainGroupTimeout(MessageGroup group) |
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
protected void |
sendReplies(java.lang.Object processorResult,
Message message) |
protected void |
sendReplyMessage(java.lang.Object reply,
java.lang.Object replyChannel) |
void |
setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) |
void |
setCorrelationStrategy(CorrelationStrategy correlationStrategy) |
void |
setDiscardChannel(MessageChannel discardChannel) |
void |
setDiscardChannelName(java.lang.String discardChannelName) |
void |
setGroupTimeoutExpression(Expression groupTimeoutExpression) |
void |
setIntegrationEvaluationContext(EvaluationContext evaluationContext) |
void |
setLockRegistry(LockRegistry lockRegistry) |
void |
setMessageStore(MessageGroupStore store) |
void |
setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
By default, when a MessageGroupStoreReaper is configured to expire partial
groups, empty groups are also removed.
|
void |
setOutputChannel(MessageChannel outputChannel)
Specify the MessageChannel to which produced Messages should be sent.
|
void |
setOutputChannelName(java.lang.String outputChannelName) |
void |
setReleasePartialSequences(boolean releasePartialSequences) |
void |
setReleaseStrategy(ReleaseStrategy releaseStrategy) |
void |
setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) |
void |
setSendTimeout(long sendTimeout) |
void |
setTaskScheduler(TaskScheduler taskScheduler) |
protected boolean |
shouldSendMultipleReplies(java.lang.Iterable<?> iter) |
protected MessageGroup |
store(java.lang.Object correlationKey,
Message<?> message) |
protected void |
verifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements) |
getOrder, handleMessage, setOrder, setShouldTrackafterPropertiesSet, getApplicationContextId, getBeanFactory, getComponentName, getConversionService, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setComponentName, setConversionService, setMessageBuilderFactory, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitgetComponentNamepublic static final long DEFAULT_SEND_TIMEOUT
protected volatile MessageGroupStore messageStore
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy)
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store)
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor)
public void setLockRegistry(LockRegistry lockRegistry)
public void setMessageStore(MessageGroupStore store)
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
public void setOutputChannel(MessageChannel outputChannel)
MessageProducersetOutputChannel in interface MessageProduceroutputChannel - The output channel.public void setOutputChannelName(java.lang.String outputChannelName)
public void setGroupTimeoutExpression(Expression groupTimeoutExpression)
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext)
setIntegrationEvaluationContext in interface IntegrationEvaluationContextAwarepublic void setTaskScheduler(TaskScheduler taskScheduler)
setTaskScheduler in class IntegrationObjectSupportpublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
setApplicationEventPublisher in interface ApplicationEventPublisherAwareprotected void onInit()
throws java.lang.Exception
IntegrationObjectSupportonInit in class IntegrationObjectSupportjava.lang.Exception - Any exception.public void setDiscardChannel(MessageChannel discardChannel)
public void setDiscardChannelName(java.lang.String discardChannelName)
public void setSendTimeout(long sendTimeout)
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
minimumTimeoutForEmptyGroups - The minimum timeout.public void setReleasePartialSequences(boolean releasePartialSequences)
public java.lang.String getComponentType()
IntegrationObjectSupportgetComponentType in interface NamedComponentgetComponentType in class AbstractMessageHandlerprotected MessageGroupStore getMessageStore()
protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>> getExpireGroupScheduledFutures()
protected MessageGroupProcessor getOutputProcessor()
protected CorrelationStrategy getCorrelationStrategy()
protected ReleaseStrategy getReleaseStrategy()
protected MessageChannel getOutputChannel()
protected java.lang.String getOutputChannelName()
protected MessagingTemplate getMessagingTemplate()
protected MessageChannel getDiscardChannel()
protected java.lang.String getDiscardChannelName()
protected boolean isSendPartialResultOnExpiry()
protected boolean isSequenceAware()
protected LockRegistry getLockRegistry()
protected boolean isLockRegistrySet()
protected long getMinimumTimeoutForEmptyGroups()
protected boolean isReleasePartialSequences()
protected Expression getGroupTimeoutExpression()
protected EvaluationContext getEvaluationContext()
protected void handleMessageInternal(Message<?> message) throws java.lang.Exception
handleMessageInternal in class AbstractMessageHandlerjava.lang.Exceptionprotected abstract void afterRelease(MessageGroup group, java.util.Collection<Message<?>> completedMessages)
group - The group.completedMessages - The completed messages.protected int findLastReleasedSequenceNumber(java.lang.Object groupId,
java.util.Collection<Message<?>> partialSequence)
protected MessageGroup store(java.lang.Object correlationKey, Message<?> message)
protected void expireGroup(java.lang.Object correlationKey,
MessageGroup group)
protected void completeGroup(java.lang.Object correlationKey,
MessageGroup group)
protected java.util.Collection<Message<?>> completeGroup(Message<?> message, java.lang.Object correlationKey, MessageGroup group)
protected void verifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements)
protected void sendReplies(java.lang.Object processorResult,
Message message)
protected void sendReplyMessage(java.lang.Object reply,
java.lang.Object replyChannel)
protected boolean shouldSendMultipleReplies(java.lang.Iterable<?> iter)
protected java.lang.Long obtainGroupTimeout(MessageGroup group)
public void destroy()
throws java.lang.Exception
destroy in interface DisposableBeanjava.lang.Exception