public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler implements DiscardingMessageHandler, org.springframework.context.ApplicationEventPublisherAware, ManageableLifecycle
MessageStore.
This class takes care of correlated groups of messages
that can be completed in batches. It is useful for custom implementation of
MessageHandlers that require correlation and is used as a base class for Aggregator -
AggregatingMessageHandler and Resequencer - ResequencingMessageHandler,
or custom implementations requiring correlation.
To customize this handler inject CorrelationStrategy,
ReleaseStrategy, and MessageGroupProcessor implementations as
you require.
By default the CorrelationStrategy will be a
HeaderAttributeCorrelationStrategy and the ReleaseStrategy will be a
SequenceSizeReleaseStrategy.
Use proper CorrelationStrategy for cases when same
MessageStore is used
for multiple handlers to ensure uniqueness of message groups across handlers.
When the expireTimeout is greater than 0, groups which are older than this timeout
are purged from the store on start up (or when purgeOrphanedGroups() is called).
If expireDuration is provided, the task is scheduled to perform
purgeOrphanedGroups() periodically.
| Modifier and Type | Class and Description |
|---|---|
protected static class |
AbstractCorrelatingMessageHandler.SequenceAwareMessageGroup |
IntegrationManagement.ManagementOverridesmessagingTemplateEXPRESSION_PARSER, loggerHIGHEST_PRECEDENCE, LOWEST_PRECEDENCEMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME| Constructor and Description |
|---|
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) |
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store) |
AbstractCorrelatingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store,
CorrelationStrategy correlationStrategy,
ReleaseStrategy releaseStrategy) |
| Modifier and Type | Method and Description |
|---|---|
protected abstract void |
afterRelease(MessageGroup group,
java.util.Collection<org.springframework.messaging.Message<?>> completedMessages)
Allows you to provide additional logic that needs to be performed after the MessageGroup was released.
|
protected void |
afterRelease(MessageGroup group,
java.util.Collection<org.springframework.messaging.Message<?>> completedMessages,
boolean timeout)
Subclasses may override if special action is needed because the group was released or discarded
due to a timeout.
|
protected java.util.Collection<org.springframework.messaging.Message<?>> |
completeGroup(org.springframework.messaging.Message<?> message,
java.lang.Object correlationKey,
MessageGroup group,
java.util.concurrent.locks.Lock lock) |
protected void |
completeGroup(java.lang.Object correlationKey,
MessageGroup group,
java.util.concurrent.locks.Lock lock) |
void |
destroy() |
protected void |
expireGroup(java.lang.Object correlationKey,
MessageGroup group,
java.util.concurrent.locks.Lock lock) |
protected int |
findLastReleasedSequenceNumber(java.lang.Object groupId,
java.util.Collection<org.springframework.messaging.Message<?>> partialSequence) |
protected void |
forceComplete(MessageGroup group) |
java.lang.String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected CorrelationStrategy |
getCorrelationStrategy() |
org.springframework.messaging.MessageChannel |
getDiscardChannel()
Return the discard channel.
|
protected java.lang.String |
getDiscardChannelName() |
protected org.springframework.expression.EvaluationContext |
getEvaluationContext() |
protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>> |
getExpireGroupScheduledFutures() |
protected org.springframework.expression.Expression |
getGroupTimeoutExpression() |
protected LockRegistry |
getLockRegistry() |
MessageGroupStore |
getMessageStore() |
protected long |
getMinimumTimeoutForEmptyGroups() |
MessageGroupProcessor |
getOutputProcessor()
Return a configured
MessageGroupProcessor. |
protected ReleaseStrategy |
getReleaseStrategy() |
protected void |
handleMessageInternal(org.springframework.messaging.Message<?> message) |
protected boolean |
isExpireGroupsUponCompletion() |
protected boolean |
isLockRegistrySet() |
protected boolean |
isReleaseLockBeforeSend() |
protected boolean |
isReleasePartialSequences() |
boolean |
isRunning() |
protected boolean |
isSendPartialResultOnExpiry() |
protected boolean |
isSequenceAware() |
protected java.lang.Long |
obtainGroupTimeout(MessageGroup group) |
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
purgeOrphanedGroups()
Perform a
MessageGroupStore.expireMessageGroups(long) with the provided expireTimeout. |
protected void |
remove(MessageGroup group) |
void |
setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher) |
void |
setCorrelationStrategy(CorrelationStrategy correlationStrategy) |
void |
setDiscardChannel(org.springframework.messaging.MessageChannel discardChannel) |
void |
setDiscardChannelName(java.lang.String discardChannelName) |
void |
setExpireDuration(java.time.Duration expireDuration)
Configure a
Duration how often to clean up old orphaned groups from the store. |
void |
setExpireDurationMillis(long expireDuration)
Configure a
Duration (in millis) how often to clean up old orphaned groups from the store. |
void |
setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)
Expire (completely remove) a group if it is completed due to timeout.
|
void |
setExpireTimeout(long expireTimeout)
Configure a timeout in milliseconds for purging old orphaned groups from the store.
|
void |
setForceReleaseAdviceChain(java.util.List<org.aopalliance.aop.Advice> forceReleaseAdviceChain) |
void |
setGroupTimeoutExpression(org.springframework.expression.Expression groupTimeoutExpression) |
void |
setLockRegistry(LockRegistry lockRegistry) |
void |
setMessageStore(MessageGroupStore store) |
void |
setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
By default, when a MessageGroupStoreReaper is configured to expire partial
groups, empty groups are also removed.
|
void |
setOutputProcessor(MessageGroupProcessor outputProcessor)
Specify a
MessageGroupProcessor for the output function. |
void |
setPopSequence(boolean popSequence)
Perform a
MessageBuilder.popSequenceDetails()
for output message or not. |
void |
setReleaseLockBeforeSend(boolean releaseLockBeforeSend)
Set to true to release the message group lock before sending any output.
|
void |
setReleasePartialSequences(boolean releasePartialSequences)
Set
releasePartialSequences on an underlying default
SequenceSizeReleaseStrategy. |
void |
setReleaseStrategy(ReleaseStrategy releaseStrategy) |
void |
setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) |
void |
start() |
void |
stop() |
protected MessageGroup |
store(java.lang.Object correlationKey,
org.springframework.messaging.Message<?> message) |
protected void |
verifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements) |
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeadershandleMessage, onComplete, onError, onNext, onSubscribebuildSendTimer, getIntegrationPatternType, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitgetThisAsgetBeanName, getComponentNamepublic AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy)
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store)
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor)
public void setLockRegistry(LockRegistry lockRegistry)
public final void setMessageStore(MessageGroupStore store)
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
public void setGroupTimeoutExpression(org.springframework.expression.Expression groupTimeoutExpression)
public void setForceReleaseAdviceChain(java.util.List<org.aopalliance.aop.Advice> forceReleaseAdviceChain)
public void setOutputProcessor(MessageGroupProcessor outputProcessor)
MessageGroupProcessor for the output function.outputProcessor - the MessageGroupProcessor to usepublic MessageGroupProcessor getOutputProcessor()
MessageGroupProcessor.MessageGroupProcessorpublic void setDiscardChannel(org.springframework.messaging.MessageChannel discardChannel)
public void setDiscardChannelName(java.lang.String discardChannelName)
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
minimumTimeoutForEmptyGroups - The minimum timeout.public void setReleasePartialSequences(boolean releasePartialSequences)
releasePartialSequences on an underlying default
SequenceSizeReleaseStrategy. Ignored for other release strategies.releasePartialSequences - true to allow release.public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)
expireGroupsUponTimeout - the expireGroupsUponTimeout to setpublic void setPopSequence(boolean popSequence)
MessageBuilder.popSequenceDetails()
for output message or not. Default to true. This option removes the sequence
information added by the nearest upstream component with applySequence=true
(for example splitter).popSequence - the boolean flag to use.protected boolean isReleaseLockBeforeSend()
public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend)
releaseLockBeforeSend - true to release the lock.public void setExpireTimeout(long expireTimeout)
expireDuration is provided, the task for running
purgeOrphanedGroups() is scheduled with that period.
The forceReleaseProcessor is used to process those expired groups according
the "force complete" options. A group can be orphaned if a persistent message group
store is used and no new messages arrive for that group after a restart.expireTimeout - the number of milliseconds to determine old orphaned groups in the store to purge.purgeOrphanedGroups()public void setExpireDurationMillis(long expireDuration)
Duration (in millis) how often to clean up old orphaned groups from the store.expireDuration - the delay how often to call purgeOrphanedGroups().purgeOrphanedGroups(),
setExpireDuration(Duration),
setExpireTimeout(long)public void setExpireDuration(@Nullable
java.time.Duration expireDuration)
Duration how often to clean up old orphaned groups from the store.expireDuration - the delay how often to call purgeOrphanedGroups().purgeOrphanedGroups(),
setExpireTimeout(long)public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
setApplicationEventPublisher in interface org.springframework.context.ApplicationEventPublisherAwareprotected void onInit()
IntegrationObjectSupportonInit in class AbstractMessageProducingHandlerpublic java.lang.String getComponentType()
IntegrationObjectSupportgetComponentType in interface NamedComponentgetComponentType in class MessageHandlerSupportpublic MessageGroupStore getMessageStore()
protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>> getExpireGroupScheduledFutures()
protected CorrelationStrategy getCorrelationStrategy()
protected ReleaseStrategy getReleaseStrategy()
public org.springframework.messaging.MessageChannel getDiscardChannel()
DiscardingMessageHandlergetDiscardChannel in interface DiscardingMessageHandlerprotected java.lang.String getDiscardChannelName()
protected boolean isSendPartialResultOnExpiry()
protected boolean isSequenceAware()
protected LockRegistry getLockRegistry()
protected boolean isLockRegistrySet()
protected long getMinimumTimeoutForEmptyGroups()
protected boolean isReleasePartialSequences()
protected org.springframework.expression.Expression getGroupTimeoutExpression()
protected org.springframework.expression.EvaluationContext getEvaluationContext()
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
handleMessageInternal in class AbstractMessageHandlerprotected boolean isExpireGroupsUponCompletion()
protected abstract void afterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages)
group - The group.completedMessages - The completed messages.protected void afterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages, boolean timeout)
afterRelease(MessageGroup, Collection) is invoked.group - The group.completedMessages - The completed messages.timeout - True if the release/discard was due to a timeout.protected void forceComplete(MessageGroup group)
protected void remove(MessageGroup group)
protected int findLastReleasedSequenceNumber(java.lang.Object groupId,
java.util.Collection<org.springframework.messaging.Message<?>> partialSequence)
protected MessageGroup store(java.lang.Object correlationKey, org.springframework.messaging.Message<?> message)
protected void expireGroup(java.lang.Object correlationKey,
MessageGroup group,
java.util.concurrent.locks.Lock lock)
protected void completeGroup(java.lang.Object correlationKey,
MessageGroup group,
java.util.concurrent.locks.Lock lock)
protected java.util.Collection<org.springframework.messaging.Message<?>> completeGroup(org.springframework.messaging.Message<?> message,
java.lang.Object correlationKey,
MessageGroup group,
java.util.concurrent.locks.Lock lock)
protected void verifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements)
protected java.lang.Long obtainGroupTimeout(MessageGroup group)
public void destroy()
destroy in interface org.springframework.beans.factory.DisposableBeandestroy in interface IntegrationManagementdestroy in class MessageHandlerSupportpublic void start()
start in interface org.springframework.context.Lifecyclestart in interface ManageableLifecyclepublic void stop()
stop in interface org.springframework.context.Lifecyclestop in interface ManageableLifecyclepublic boolean isRunning()
isRunning in interface org.springframework.context.LifecycleisRunning in interface ManageableLifecyclepublic void purgeOrphanedGroups()
MessageGroupStore.expireMessageGroups(long) with the provided expireTimeout.
Can be called externally at any time.
Internally it is called from the scheduled task with the configured expireDuration.