Class AbstractCorrelatingMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler
- All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>,Aware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,ApplicationContextAware,ApplicationEventPublisherAware,Lifecycle,Ordered,ExpressionCapable,Orderable,MessageProducer,DiscardingMessageHandler,HeaderPropagationAware,IntegrationPattern,NamedComponent,IntegrationManagement,ManageableLifecycle,TrackableComponent,MessageHandler,reactor.core.CoreSubscriber<Message<?>>
- Direct Known Subclasses:
AggregatingMessageHandler,ResequencingMessageHandler
public abstract class AbstractCorrelatingMessageHandler
extends AbstractMessageProducingHandler
implements DiscardingMessageHandler, ApplicationEventPublisherAware, ManageableLifecycle
Abstract Message handler that holds a buffer of correlated messages in a
MessageStore.
This class takes care of correlated groups of messages
that can be completed in batches. It is useful for custom implementation of
MessageHandlers that require correlation and is used as a base class for Aggregator -
AggregatingMessageHandler and Resequencer - ResequencingMessageHandler,
or custom implementations requiring correlation.
To customize this handler inject CorrelationStrategy,
ReleaseStrategy, and MessageGroupProcessor implementations as
you require.
By default the CorrelationStrategy will be a
HeaderAttributeCorrelationStrategy and the ReleaseStrategy will be a
SequenceSizeReleaseStrategy.
Use proper CorrelationStrategy for cases when same
MessageStore is used
for multiple handlers to ensure uniqueness of message groups across handlers.
When the expireTimeout is greater than 0, groups which are older than this timeout
are purged from the store on start up (or when purgeOrphanedGroups() is called).
If expireDuration is provided, the task is scheduled to perform
purgeOrphanedGroups() periodically.
- Since:
- 2.0
- Author:
- Iwein Fuld, Dave Syer, Oleg Zhurakousky, Gary Russell, Artem Bilan, David Liu, Enrique Rodriguez, Meherzad Lahewala, Jayadev Sirimamilla
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides -
Field Summary
Fields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplateFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE -
Constructor Summary
ConstructorsConstructorDescriptionAbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store) AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) -
Method Summary
Modifier and TypeMethodDescriptionprotected abstract voidafterRelease(MessageGroup group, Collection<Message<?>> completedMessages) Allows you to provide additional logic that needs to be performed after the MessageGroup was released.protected voidafterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) Subclasses may override if special action is needed because the group was released or discarded due to a timeout.protected voidcompleteGroup(Object correlationKey, MessageGroup group, Lock lock) protected Collection<Message<?>>completeGroup(Message<?> message, Object correlationKey, MessageGroup group, Lock lock) voiddestroy()protected voidexpireGroup(Object correlationKey, MessageGroup group, Lock lock) protected intfindLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) protected voidforceComplete(MessageGroup group) Subclasses may implement this method to provide component type information.protected CorrelationStrategyReturn the discard channel.protected Stringprotected EvaluationContextprotected Map<UUID,ScheduledFuture<?>> protected BiFunction<Message<?>,String, String> protected Expressionprotected LockRegistryprotected longReturn a configuredMessageGroupProcessor.protected ReleaseStrategyprotected voidhandleMessageInternal(Message<?> message) protected booleanprotected booleanprotected booleanprotected booleanbooleanprotected booleanprotected booleanprotected ObjectobtainGroupTimeout(MessageGroup group) protected voidonInit()Subclasses may implement this for initialization logic.voidPerform aMessageGroupStore.expireMessageGroups(long)with the providedexpireTimeout.protected voidremove(MessageGroup group) voidsetApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) voidsetCorrelationStrategy(CorrelationStrategy correlationStrategy) voidsetDiscardChannel(MessageChannel discardChannel) voidsetDiscardChannelName(String discardChannelName) voidsetExpireDuration(Duration expireDuration) Configure aDurationhow often to clean up old orphaned groups from the store.voidsetExpireDurationMillis(long expireDuration) Configure aDuration(in millis) how often to clean up old orphaned groups from the store.voidsetExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) Expire (completely remove) a group if it is completed due to timeout.voidsetExpireTimeout(long expireTimeout) Configure a timeout in milliseconds for purging old orphaned groups from the store.voidsetForceReleaseAdviceChain(List<Advice> forceReleaseAdviceChain) voidsetGroupConditionSupplier(BiFunction<Message<?>, String, String> conditionSupplier) Configure aBiFunctionto supply a group condition from a message to be added to the group.voidsetGroupTimeoutExpression(Expression groupTimeoutExpression) voidsetLockRegistry(LockRegistry lockRegistry) final voidsetMessageStore(MessageGroupStore store) voidsetMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.voidsetOutputProcessor(MessageGroupProcessor outputProcessor) Specify aMessageGroupProcessorfor the output function.voidsetPopSequence(boolean popSequence) Perform aMessageBuilder.popSequenceDetails()for output message or not.voidsetReleaseLockBeforeSend(boolean releaseLockBeforeSend) Set to true to release the message group lock before sending any output.voidsetReleasePartialSequences(boolean releasePartialSequences) SetreleasePartialSequenceson an underlying defaultSequenceSizeReleaseStrategy.voidsetReleaseStrategy(ReleaseStrategy releaseStrategy) voidsetSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) voidstart()voidstop()protected MessageGroupprotected voidverifyResultCollectionConsistsOfMessages(Collection<?> elements) Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, setupMessageProcessor, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeadersMethods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribe, setObservationConventionMethods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, getIntegrationPatternType, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface reactor.core.CoreSubscriber
currentContextMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAsMethods inherited from interface org.springframework.messaging.MessageHandler
handleMessageMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
AbstractCorrelatingMessageHandler
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) -
AbstractCorrelatingMessageHandler
-
AbstractCorrelatingMessageHandler
-
-
Method Details
-
setLockRegistry
-
setMessageStore
-
setCorrelationStrategy
-
setReleaseStrategy
-
setGroupTimeoutExpression
-
setForceReleaseAdviceChain
-
setOutputProcessor
Specify aMessageGroupProcessorfor the output function.- Parameters:
outputProcessor- theMessageGroupProcessorto use- Since:
- 5.0
-
getOutputProcessor
Return a configuredMessageGroupProcessor.- Returns:
- the configured
MessageGroupProcessor - Since:
- 5.2
-
setDiscardChannel
-
setDiscardChannelName
-
setSendPartialResultOnExpiry
public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) -
setMinimumTimeoutForEmptyGroups
public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds.- Parameters:
minimumTimeoutForEmptyGroups- The minimum timeout.
-
setReleasePartialSequences
public void setReleasePartialSequences(boolean releasePartialSequences) SetreleasePartialSequenceson an underlying defaultSequenceSizeReleaseStrategy. Ignored for other release strategies.- Parameters:
releasePartialSequences- true to allow release.
-
setExpireGroupsUponTimeout
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) Expire (completely remove) a group if it is completed due to timeout. Default true- Parameters:
expireGroupsUponTimeout- the expireGroupsUponTimeout to set- Since:
- 4.1
-
setPopSequence
public void setPopSequence(boolean popSequence) Perform aMessageBuilder.popSequenceDetails()for output message or not. Default to true. This option removes the sequence information added by the nearest upstream component withapplySequence=true(for example splitter).- Parameters:
popSequence- the boolean flag to use.- Since:
- 5.1
-
isReleaseLockBeforeSend
protected boolean isReleaseLockBeforeSend() -
setReleaseLockBeforeSend
public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend) Set to true to release the message group lock before sending any output. See "Avoiding Deadlocks" in the Aggregator section of the reference manual for more information as to why this might be needed.- Parameters:
releaseLockBeforeSend- true to release the lock.- Since:
- 5.1.1
-
setExpireTimeout
public void setExpireTimeout(long expireTimeout) Configure a timeout in milliseconds for purging old orphaned groups from the store. Used on startup and when anexpireDurationis provided, the task for runningpurgeOrphanedGroups()is scheduled with that period. TheforceReleaseProcessoris used to process those expired groups according the "force complete" options. A group can be orphaned if a persistent message group store is used and no new messages arrive for that group after a restart.- Parameters:
expireTimeout- the number of milliseconds to determine old orphaned groups in the store to purge.- Since:
- 5.4
- See Also:
-
setExpireDurationMillis
public void setExpireDurationMillis(long expireDuration) Configure aDuration(in millis) how often to clean up old orphaned groups from the store.- Parameters:
expireDuration- the delay how often to callpurgeOrphanedGroups().- Since:
- 5.4
- See Also:
-
setExpireDuration
Configure aDurationhow often to clean up old orphaned groups from the store.- Parameters:
expireDuration- the delay how often to callpurgeOrphanedGroups().- Since:
- 5.4
- See Also:
-
setGroupConditionSupplier
Configure aBiFunctionto supply a group condition from a message to be added to the group. Thenullresult from the function will reset a condition set before.- Parameters:
conditionSupplier- the function to supply a group condition from a message to be added to the group.- Since:
- 5.5
- See Also:
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisherin interfaceApplicationEventPublisherAware
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
onInitin classAbstractMessageProducingHandler
-
getComponentType
Description copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
getComponentTypein interfaceNamedComponent- Overrides:
getComponentTypein classMessageHandlerSupport
-
getMessageStore
-
getExpireGroupScheduledFutures
-
getCorrelationStrategy
-
getReleaseStrategy
-
getGroupConditionSupplier
-
getDiscardChannel
Description copied from interface:DiscardingMessageHandlerReturn the discard channel.- Specified by:
getDiscardChannelin interfaceDiscardingMessageHandler- Returns:
- the channel.
-
getDiscardChannelName
-
isSendPartialResultOnExpiry
protected boolean isSendPartialResultOnExpiry() -
isSequenceAware
protected boolean isSequenceAware() -
getLockRegistry
-
isLockRegistrySet
protected boolean isLockRegistrySet() -
getMinimumTimeoutForEmptyGroups
protected long getMinimumTimeoutForEmptyGroups() -
isReleasePartialSequences
protected boolean isReleasePartialSequences() -
getGroupTimeoutExpression
-
getEvaluationContext
-
handleMessageInternal
- Specified by:
handleMessageInternalin classAbstractMessageHandler
-
isExpireGroupsUponCompletion
protected boolean isExpireGroupsUponCompletion() -
afterRelease
Allows you to provide additional logic that needs to be performed after the MessageGroup was released.- Parameters:
group- The group.completedMessages- The completed messages.
-
afterRelease
protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) Subclasses may override if special action is needed because the group was released or discarded due to a timeout. By default,afterRelease(MessageGroup, Collection)is invoked.- Parameters:
group- The group.completedMessages- The completed messages.timeout- True if the release/discard was due to a timeout.
-
forceComplete
-
remove
-
findLastReleasedSequenceNumber
protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) -
store
-
expireGroup
-
completeGroup
-
completeGroup
protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group, Lock lock) -
verifyResultCollectionConsistsOfMessages
-
obtainGroupTimeout
-
destroy
public void destroy()- Specified by:
destroyin interfaceDisposableBean- Specified by:
destroyin interfaceIntegrationManagement- Overrides:
destroyin classMessageHandlerSupport
-
start
public void start()- Specified by:
startin interfaceLifecycle- Specified by:
startin interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stopin interfaceLifecycle- Specified by:
stopin interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunningin interfaceLifecycle- Specified by:
isRunningin interfaceManageableLifecycle
-
purgeOrphanedGroups
public void purgeOrphanedGroups()Perform aMessageGroupStore.expireMessageGroups(long)with the providedexpireTimeout. Can be called externally at any time. Internally it is called from the scheduled task with the configuredexpireDuration.- Since:
- 5.4
-