public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements Lifecycle
AbstractMessageProducingHandler implementation for aggregation logic based
on Reactor's Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int) operators.
The incoming messages are emitted into a FluxSink provided by the
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) initialized in the constructor.
The resulting windows for groups are wrapped into Messages for downstream
consumption.
If the AbstractMessageProducingHandler.getOutputChannel() is not a ReactiveStreamsSubscribableChannel
instance, a subscription for the whole aggregating Flux is performed in the
start() method.
IntegrationManagement.ManagementOverridesmessagingTemplateEXPRESSION_PARSER, loggerMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEHIGHEST_PRECEDENCE, LOWEST_PRECEDENCE| Constructor and Description |
|---|
FluxAggregatorMessageHandler()
Create an instance with a
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) and apply Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int)
transformation into it. |
| Modifier and Type | Method and Description |
|---|---|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
IntegrationPatternType |
getIntegrationPatternType()
Return a pattern type this component implements.
|
protected void |
handleMessageInternal(Message<?> message) |
boolean |
isRunning() |
void |
setBoundaryTrigger(java.util.function.Predicate<Message<?>> boundaryTrigger)
Configure a
Predicate for messages to determine a window boundary in the
Flux.windowUntil(java.util.function.Predicate<T>) operator. |
void |
setCombineFunction(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
Configure a transformation
Function to apply for a Flux window to emit. |
void |
setCorrelationStrategy(CorrelationStrategy correlationStrategy)
Configure a
CorrelationStrategy to determine a group key from the incoming messages. |
void |
setWindowConfigurer(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
Configure a
Function to apply a transformation into the grouping Flux
for any arbitrary Flux.window(int) options not covered by the simple options. |
void |
setWindowSize(int windowSize)
Specify a size for windows to close.
|
void |
setWindowSizeFunction(java.util.function.Function<Message<?>,Integer> windowSizeFunction)
Specify a
Function to determine a size for windows to close against the first message in group. |
void |
setWindowTimespan(java.time.Duration windowTimespan)
Configure a
Duration for closing windows periodically. |
protected boolean |
shouldCopyRequestHeaders()
Subclasses may override this.
|
void |
start() |
void |
stop() |
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, onInit, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldSplitOutput, updateNotPropagatedHeadershandleMessage, onComplete, onError, onNext, onSubscribebuildSendTimer, configureMetrics, destroy, getActiveCount, getActiveCountLong, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getHandlerMetrics, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMetricsCaptor, getMinDuration, getOrder, getOverrides, getStandardDeviationDuration, isCountsEnabled, isLoggingEnabled, isStatsEnabled, registerMetricsCaptor, reset, sendTimer, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled, shouldTrackafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waiterrorCount, handleCountgetBeanName, getComponentNamepublic FluxAggregatorMessageHandler()
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) and apply Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int)
transformation into it.public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
CorrelationStrategy to determine a group key from the incoming messages.
By default a HeaderAttributeCorrelationStrategy is used against a
IntegrationMessageHeaderAccessor.CORRELATION_ID header value.correlationStrategy - the CorrelationStrategy to use.public void setCombineFunction(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
Function to apply for a Flux window to emit.
Requires a Mono result with a Message as value as a combination result
of the incoming Flux for window.
By default a Flux for window is fully wrapped into a message with headers copied
from the first message in window. Such a Flux in the payload has to be subscribed
and consumed downstream.combineFunction - the Function to use for result windows transformation.public void setBoundaryTrigger(java.util.function.Predicate<Message<?>> boundaryTrigger)
Predicate for messages to determine a window boundary in the
Flux.windowUntil(java.util.function.Predicate<T>) operator.
Has a precedence over any other window configuration options.boundaryTrigger - the Predicate to use for window boundary.Flux.windowUntil(Predicate)public void setWindowSize(int windowSize)
setWindowTimespan(Duration).windowSize - the size for window to use.Flux.window(int),
Flux.windowTimeout(int, Duration)public void setWindowSizeFunction(java.util.function.Function<Message<?>,Integer> windowSizeFunction)
Function to determine a size for windows to close against the first message in group.
Tne result of the function can be combined with the setWindowTimespan(Duration).
By default an IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header is consulted.windowSizeFunction - the Function to use to determine a window size
against a first message in the group.Flux.window(int),
Flux.windowTimeout(int, Duration)public void setWindowTimespan(java.time.Duration windowTimespan)
Duration for closing windows periodically.
Can be combined with the setWindowSize(int) or setWindowSizeFunction(Function).windowTimespan - the Duration to use for windows to close periodically.Flux.window(Duration),
Flux.windowTimeout(int, Duration)public void setWindowConfigurer(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
Function to apply a transformation into the grouping Flux
for any arbitrary Flux.window(int) options not covered by the simple options.
Has a precedence over any other window configuration options.windowConfigurer - the Function to apply any custom window transformation.public String getComponentType()
IntegrationObjectSupportgetComponentType in interface NamedComponentgetComponentType in class MessageHandlerSupportpublic IntegrationPatternType getIntegrationPatternType()
IntegrationPatterngetIntegrationPatternType in interface IntegrationPatterngetIntegrationPatternType in class MessageHandlerSupportIntegrationPatternType this component implements.protected void handleMessageInternal(Message<?> message)
handleMessageInternal in class AbstractMessageHandlerprotected boolean shouldCopyRequestHeaders()
AbstractMessageProducingHandlershouldCopyRequestHeaders in class AbstractMessageProducingHandler