Class IntegrationReactiveUtils
java.lang.Object
org.springframework.integration.util.IntegrationReactiveUtils
public final class IntegrationReactiveUtils
extends java.lang.Object
Utilities for adapting integration components to/from reactive types.
- Since:
- 5.3
-
Field Summary
Fields Modifier and Type Field Description static java.time.DurationDEFAULT_DELAY_WHEN_EMPTYA default delay before repeating an empty sourceMonoas 1 secondDuration.static java.lang.StringDELAY_WHEN_EMPTY_KEYThe subscriber context entry forFlux.delayElements(java.time.Duration)from theMono.repeatWhenEmpty(java.util.function.Function). -
Method Summary
Modifier and Type Method Description static <T> reactor.core.publisher.Flux<org.springframework.messaging.Message<T>>messageChannelToFlux(org.springframework.messaging.MessageChannel messageChannel)Adapt a providedMessageChannelinto aFluxsource: - aFluxMessageChannelis returned as is because it is already aPublisher; - aSubscribableChannelis subscribed with aMessageHandlerfor theSinks.Many.tryEmitNext(Object)which is returned from this method; - aPollableChannelis wrapped into aMessageSourcelambda and reusesmessageSourceToFlux(MessageSource).static <T> reactor.core.publisher.Flux<org.springframework.messaging.Message<T>>messageSourceToFlux(MessageSource<T> messageSource)Wrap a providedMessageSourceinto aFluxfor pulling the on demand.
-
Field Details
-
DELAY_WHEN_EMPTY_KEY
public static final java.lang.String DELAY_WHEN_EMPTY_KEYThe subscriber context entry forFlux.delayElements(java.time.Duration)from theMono.repeatWhenEmpty(java.util.function.Function).- See Also:
- Constant Field Values
-
DEFAULT_DELAY_WHEN_EMPTY
public static final java.time.Duration DEFAULT_DELAY_WHEN_EMPTYA default delay before repeating an empty sourceMonoas 1 secondDuration.
-
-
Method Details
-
messageSourceToFlux
public static <T> reactor.core.publisher.Flux<org.springframework.messaging.Message<T>> messageSourceToFlux(MessageSource<T> messageSource)Wrap a providedMessageSourceinto aFluxfor pulling the on demand. WhenMessageSource.receive()returnsnull, the sourceMonogoes to theMono.repeatWhenEmpty(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, ? extends org.reactivestreams.Publisher<?>>)state and performs adelaybased on theDELAY_WHEN_EMPTY_KEYDurationentry in the subscriber context or falls back to 1 second duration. If a produced message has anIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader it is ack'ed in theMono.doOnSuccess(java.util.function.Consumer<? super T>)and nack'ed in theMono.doOnError(java.util.function.Consumer<? super java.lang.Throwable>).- Type Parameters:
T- the expected payload type.- Parameters:
messageSource- theMessageSourceto adapt.- Returns:
- a
Fluxwhich pulls messages from theMessageSourceon demand.
-
messageChannelToFlux
public static <T> reactor.core.publisher.Flux<org.springframework.messaging.Message<T>> messageChannelToFlux(org.springframework.messaging.MessageChannel messageChannel)Adapt a providedMessageChannelinto aFluxsource: - aFluxMessageChannelis returned as is because it is already aPublisher; - aSubscribableChannelis subscribed with aMessageHandlerfor theSinks.Many.tryEmitNext(Object)which is returned from this method; - aPollableChannelis wrapped into aMessageSourcelambda and reusesmessageSourceToFlux(MessageSource).- Type Parameters:
T- the expected payload type.- Parameters:
messageChannel- theMessageChannelto adapt.- Returns:
- a
Fluxwhich uses a providedMessageChannelas a source for events to publish.
-