Class AbstractCorrelatingMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler
- All Implemented Interfaces:
org.reactivestreams.Subscriber<org.springframework.messaging.Message<?>>,org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanFactoryAware,org.springframework.beans.factory.BeanNameAware,org.springframework.beans.factory.DisposableBean,org.springframework.beans.factory.InitializingBean,org.springframework.context.ApplicationContextAware,org.springframework.context.ApplicationEventPublisherAware,org.springframework.context.Lifecycle,org.springframework.core.Ordered,ExpressionCapable,Orderable,MessageProducer,DiscardingMessageHandler,HeaderPropagationAware,IntegrationPattern,NamedComponent,IntegrationManagement,ManageableLifecycle,TrackableComponent,org.springframework.messaging.MessageHandler,reactor.core.CoreSubscriber<org.springframework.messaging.Message<?>>
- Direct Known Subclasses:
AggregatingMessageHandler,ResequencingMessageHandler
public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler implements DiscardingMessageHandler, org.springframework.context.ApplicationEventPublisherAware, ManageableLifecycle
Abstract Message handler that holds a buffer of correlated messages in a
MessageStore.
This class takes care of correlated groups of messages
that can be completed in batches. It is useful for custom implementation of
MessageHandlers that require correlation and is used as a base class for Aggregator -
AggregatingMessageHandler and Resequencer - ResequencingMessageHandler,
or custom implementations requiring correlation.
To customize this handler inject CorrelationStrategy,
ReleaseStrategy, and MessageGroupProcessor implementations as
you require.
By default the CorrelationStrategy will be a
HeaderAttributeCorrelationStrategy and the ReleaseStrategy will be a
SequenceSizeReleaseStrategy.
Use proper CorrelationStrategy for cases when same
MessageStore is used
for multiple handlers to ensure uniqueness of message groups across handlers.
When the expireTimeout is greater than 0, groups which are older than this timeout
are purged from the store on start up (or when purgeOrphanedGroups() is called).
If expireDuration is provided, the task is scheduled to perform
purgeOrphanedGroups() periodically.
- Since:
- 2.0
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static classAbstractCorrelatingMessageHandler.SequenceAwareMessageGroupNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides -
Field Summary
Fields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplateFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME -
Constructor Summary
Constructors Constructor Description AbstractCorrelatingMessageHandler(MessageGroupProcessor processor)AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store)AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) -
Method Summary
Modifier and Type Method Description protected abstract voidafterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages)Allows you to provide additional logic that needs to be performed after the MessageGroup was released.protected voidafterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages, boolean timeout)Subclasses may override if special action is needed because the group was released or discarded due to a timeout.protected voidcompleteGroup(java.lang.Object correlationKey, MessageGroup group, java.util.concurrent.locks.Lock lock)protected java.util.Collection<org.springframework.messaging.Message<?>>completeGroup(org.springframework.messaging.Message<?> message, java.lang.Object correlationKey, MessageGroup group, java.util.concurrent.locks.Lock lock)voiddestroy()protected voidexpireGroup(java.lang.Object correlationKey, MessageGroup group, java.util.concurrent.locks.Lock lock)protected intfindLastReleasedSequenceNumber(java.lang.Object groupId, java.util.Collection<org.springframework.messaging.Message<?>> partialSequence)protected voidforceComplete(MessageGroup group)java.lang.StringgetComponentType()Subclasses may implement this method to provide component type information.protected CorrelationStrategygetCorrelationStrategy()org.springframework.messaging.MessageChannelgetDiscardChannel()Return the discard channel.protected java.lang.StringgetDiscardChannelName()protected org.springframework.expression.EvaluationContextgetEvaluationContext()protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>>getExpireGroupScheduledFutures()protected org.springframework.expression.ExpressiongetGroupTimeoutExpression()protected LockRegistrygetLockRegistry()MessageGroupStoregetMessageStore()protected longgetMinimumTimeoutForEmptyGroups()MessageGroupProcessorgetOutputProcessor()Return a configuredMessageGroupProcessor.protected ReleaseStrategygetReleaseStrategy()protected voidhandleMessageInternal(org.springframework.messaging.Message<?> message)protected booleanisExpireGroupsUponCompletion()protected booleanisLockRegistrySet()protected booleanisReleaseLockBeforeSend()protected booleanisReleasePartialSequences()booleanisRunning()protected booleanisSendPartialResultOnExpiry()protected booleanisSequenceAware()protected java.lang.LongobtainGroupTimeout(MessageGroup group)protected voidonInit()Subclasses may implement this for initialization logic.voidpurgeOrphanedGroups()Perform aMessageGroupStore.expireMessageGroups(long)with the providedexpireTimeout.protected voidremove(MessageGroup group)voidsetApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)voidsetCorrelationStrategy(CorrelationStrategy correlationStrategy)voidsetDiscardChannel(org.springframework.messaging.MessageChannel discardChannel)voidsetDiscardChannelName(java.lang.String discardChannelName)voidsetExpireDuration(java.time.Duration expireDuration)Configure aDurationhow often to clean up old orphaned groups from the store.voidsetExpireDurationMillis(long expireDuration)Configure aDuration(in millis) how often to clean up old orphaned groups from the store.voidsetExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)Expire (completely remove) a group if it is completed due to timeout.voidsetExpireTimeout(long expireTimeout)Configure a timeout in milliseconds for purging old orphaned groups from the store.voidsetForceReleaseAdviceChain(java.util.List<org.aopalliance.aop.Advice> forceReleaseAdviceChain)voidsetGroupTimeoutExpression(org.springframework.expression.Expression groupTimeoutExpression)voidsetLockRegistry(LockRegistry lockRegistry)voidsetMessageStore(MessageGroupStore store)voidsetMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.voidsetOutputProcessor(MessageGroupProcessor outputProcessor)Specify aMessageGroupProcessorfor the output function.voidsetPopSequence(boolean popSequence)Perform aMessageBuilder.popSequenceDetails()for output message or not.voidsetReleaseLockBeforeSend(boolean releaseLockBeforeSend)Set to true to release the message group lock before sending any output.voidsetReleasePartialSequences(boolean releasePartialSequences)SetreleasePartialSequenceson an underlying defaultSequenceSizeReleaseStrategy.voidsetReleaseStrategy(ReleaseStrategy releaseStrategy)voidsetSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)voidstart()voidstop()protected MessageGroupstore(java.lang.Object correlationKey, org.springframework.messaging.Message<?> message)protected voidverifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements)Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeadersMethods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribeMethods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, getIntegrationPatternType, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAsMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
AbstractCorrelatingMessageHandler
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) -
AbstractCorrelatingMessageHandler
-
AbstractCorrelatingMessageHandler
-
-
Method Details
-
setLockRegistry
-
setMessageStore
-
setCorrelationStrategy
-
setReleaseStrategy
-
setGroupTimeoutExpression
public void setGroupTimeoutExpression(org.springframework.expression.Expression groupTimeoutExpression) -
setForceReleaseAdviceChain
public void setForceReleaseAdviceChain(java.util.List<org.aopalliance.aop.Advice> forceReleaseAdviceChain) -
setOutputProcessor
Specify aMessageGroupProcessorfor the output function.- Parameters:
outputProcessor- theMessageGroupProcessorto use- Since:
- 5.0
-
getOutputProcessor
Return a configuredMessageGroupProcessor.- Returns:
- the configured
MessageGroupProcessor - Since:
- 5.2
-
setDiscardChannel
public void setDiscardChannel(org.springframework.messaging.MessageChannel discardChannel) -
setDiscardChannelName
public void setDiscardChannelName(java.lang.String discardChannelName) -
setSendPartialResultOnExpiry
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) -
setMinimumTimeoutForEmptyGroups
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds.- Parameters:
minimumTimeoutForEmptyGroups- The minimum timeout.
-
setReleasePartialSequences
public void setReleasePartialSequences(boolean releasePartialSequences)SetreleasePartialSequenceson an underlying defaultSequenceSizeReleaseStrategy. Ignored for other release strategies.- Parameters:
releasePartialSequences- true to allow release.
-
setExpireGroupsUponTimeout
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)Expire (completely remove) a group if it is completed due to timeout. Default true- Parameters:
expireGroupsUponTimeout- the expireGroupsUponTimeout to set- Since:
- 4.1
-
setPopSequence
public void setPopSequence(boolean popSequence)Perform aMessageBuilder.popSequenceDetails()for output message or not. Default to true. This option removes the sequence information added by the nearest upstream component withapplySequence=true(for example splitter).- Parameters:
popSequence- the boolean flag to use.- Since:
- 5.1
-
isReleaseLockBeforeSend
protected boolean isReleaseLockBeforeSend() -
setReleaseLockBeforeSend
public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend)Set to true to release the message group lock before sending any output. See "Avoiding Deadlocks" in the Aggregator section of the reference manual for more information as to why this might be needed.- Parameters:
releaseLockBeforeSend- true to release the lock.- Since:
- 5.1.1
-
setExpireTimeout
public void setExpireTimeout(long expireTimeout)Configure a timeout in milliseconds for purging old orphaned groups from the store. Used on startup and when anexpireDurationis provided, the task for runningpurgeOrphanedGroups()is scheduled with that period. TheforceReleaseProcessoris used to process those expired groups according the "force complete" options. A group can be orphaned if a persistent message group store is used and no new messages arrive for that group after a restart.- Parameters:
expireTimeout- the number of milliseconds to determine old orphaned groups in the store to purge.- Since:
- 5.4
- See Also:
purgeOrphanedGroups()
-
setExpireDurationMillis
public void setExpireDurationMillis(long expireDuration)Configure aDuration(in millis) how often to clean up old orphaned groups from the store.- Parameters:
expireDuration- the delay how often to callpurgeOrphanedGroups().- Since:
- 5.4
- See Also:
purgeOrphanedGroups(),setExpireDuration(Duration),setExpireTimeout(long)
-
setExpireDuration
public void setExpireDuration(@Nullable java.time.Duration expireDuration)Configure aDurationhow often to clean up old orphaned groups from the store.- Parameters:
expireDuration- the delay how often to callpurgeOrphanedGroups().- Since:
- 5.4
- See Also:
purgeOrphanedGroups(),setExpireTimeout(long)
-
setApplicationEventPublisher
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)- Specified by:
setApplicationEventPublisherin interfaceorg.springframework.context.ApplicationEventPublisherAware
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
onInitin classAbstractMessageProducingHandler
-
getComponentType
public java.lang.String getComponentType()Description copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
getComponentTypein interfaceNamedComponent- Overrides:
getComponentTypein classMessageHandlerSupport
-
getMessageStore
-
getExpireGroupScheduledFutures
protected java.util.Map<java.util.UUID,java.util.concurrent.ScheduledFuture<?>> getExpireGroupScheduledFutures() -
getCorrelationStrategy
-
getReleaseStrategy
-
getDiscardChannel
public org.springframework.messaging.MessageChannel getDiscardChannel()Description copied from interface:DiscardingMessageHandlerReturn the discard channel.- Specified by:
getDiscardChannelin interfaceDiscardingMessageHandler- Returns:
- the channel.
-
getDiscardChannelName
protected java.lang.String getDiscardChannelName() -
isSendPartialResultOnExpiry
protected boolean isSendPartialResultOnExpiry() -
isSequenceAware
protected boolean isSequenceAware() -
getLockRegistry
-
isLockRegistrySet
protected boolean isLockRegistrySet() -
getMinimumTimeoutForEmptyGroups
protected long getMinimumTimeoutForEmptyGroups() -
isReleasePartialSequences
protected boolean isReleasePartialSequences() -
getGroupTimeoutExpression
protected org.springframework.expression.Expression getGroupTimeoutExpression() -
getEvaluationContext
protected org.springframework.expression.EvaluationContext getEvaluationContext() -
handleMessageInternal
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)- Specified by:
handleMessageInternalin classAbstractMessageHandler
-
isExpireGroupsUponCompletion
protected boolean isExpireGroupsUponCompletion() -
afterRelease
protected abstract void afterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages)Allows you to provide additional logic that needs to be performed after the MessageGroup was released.- Parameters:
group- The group.completedMessages- The completed messages.
-
afterRelease
protected void afterRelease(MessageGroup group, java.util.Collection<org.springframework.messaging.Message<?>> completedMessages, boolean timeout)Subclasses may override if special action is needed because the group was released or discarded due to a timeout. By default,afterRelease(MessageGroup, Collection)is invoked.- Parameters:
group- The group.completedMessages- The completed messages.timeout- True if the release/discard was due to a timeout.
-
forceComplete
-
remove
-
findLastReleasedSequenceNumber
protected int findLastReleasedSequenceNumber(java.lang.Object groupId, java.util.Collection<org.springframework.messaging.Message<?>> partialSequence) -
store
protected MessageGroup store(java.lang.Object correlationKey, org.springframework.messaging.Message<?> message) -
expireGroup
protected void expireGroup(java.lang.Object correlationKey, MessageGroup group, java.util.concurrent.locks.Lock lock) -
completeGroup
protected void completeGroup(java.lang.Object correlationKey, MessageGroup group, java.util.concurrent.locks.Lock lock) -
completeGroup
protected java.util.Collection<org.springframework.messaging.Message<?>> completeGroup(org.springframework.messaging.Message<?> message, java.lang.Object correlationKey, MessageGroup group, java.util.concurrent.locks.Lock lock) -
verifyResultCollectionConsistsOfMessages
protected void verifyResultCollectionConsistsOfMessages(java.util.Collection<?> elements) -
obtainGroupTimeout
-
destroy
public void destroy()- Specified by:
destroyin interfaceorg.springframework.beans.factory.DisposableBean- Specified by:
destroyin interfaceIntegrationManagement- Overrides:
destroyin classMessageHandlerSupport
-
start
public void start()- Specified by:
startin interfaceorg.springframework.context.Lifecycle- Specified by:
startin interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stopin interfaceorg.springframework.context.Lifecycle- Specified by:
stopin interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunningin interfaceorg.springframework.context.Lifecycle- Specified by:
isRunningin interfaceManageableLifecycle
-
purgeOrphanedGroups
public void purgeOrphanedGroups()Perform aMessageGroupStore.expireMessageGroups(long)with the providedexpireTimeout. Can be called externally at any time. Internally it is called from the scheduled task with the configuredexpireDuration.- Since:
- 5.4
-