public final class IntegrationReactiveUtils extends Object
| Modifier and Type | Field and Description |
|---|---|
static java.time.Duration |
DEFAULT_DELAY_WHEN_EMPTY
A default delay before repeating an empty source
Mono as 1 second Duration. |
static String |
DELAY_WHEN_EMPTY_KEY
The subscriber context entry for
Flux.delayElements(java.time.Duration)
from the Mono.repeatWhenEmpty(java.util.function.Function). |
| Modifier and Type | Method and Description |
|---|---|
static <T> reactor.core.publisher.Flux<Message<T>> |
messageChannelToFlux(MessageChannel messageChannel)
Adapt a provided
MessageChannel into a Flux source:
- a FluxMessageChannel
is returned as is because it is already a Publisher;
- a SubscribableChannel is subscribed with a MessageHandler
for the Sinks.Many#emitNext(Object) which is returned from this method;
- a PollableChannel is wrapped into a MessageSource lambda and reuses
messageSourceToFlux(MessageSource). |
static <T> reactor.core.publisher.Flux<Message<T>> |
messageSourceToFlux(MessageSource<T> messageSource)
Wrap a provided
MessageSource into a Flux for pulling the on demand. |
public static final String DELAY_WHEN_EMPTY_KEY
Flux.delayElements(java.time.Duration)
from the Mono.repeatWhenEmpty(java.util.function.Function).public static final java.time.Duration DEFAULT_DELAY_WHEN_EMPTY
Mono as 1 second Duration.public static <T> reactor.core.publisher.Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource)
MessageSource into a Flux for pulling the on demand.
When MessageSource.receive() returns null, the source Mono
goes to the Mono.repeatWhenEmpty(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, ? extends org.reactivestreams.Publisher<?>>) state and performs a delay
based on the DELAY_WHEN_EMPTY_KEY Duration entry in the subscriber context
or falls back to 1 second duration.
If a produced message has an
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header
it is ack'ed in the Mono.doOnSuccess(java.util.function.Consumer<? super T>) and nack'ed in the Mono.doOnError(java.util.function.Consumer<? super java.lang.Throwable>).T - the expected payload type.messageSource - the MessageSource to adapt.Flux which pulls messages from the MessageSource on demand.public static <T> reactor.core.publisher.Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel)
MessageChannel into a Flux source:
- a FluxMessageChannel
is returned as is because it is already a Publisher;
- a SubscribableChannel is subscribed with a MessageHandler
for the Sinks.Many#emitNext(Object) which is returned from this method;
- a PollableChannel is wrapped into a MessageSource lambda and reuses
messageSourceToFlux(MessageSource).T - the expected payload type.messageChannel - the MessageChannel to adapt.Flux which uses a provided MessageChannel as a source for events to publish.