public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler
RSocketRequester, which can be obtained from the
ClientRSocketConnector on the client side or from the
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER request message header
on the server side.
An RSocket operation is determined by the configured RSocketOutboundGateway.Command or respective SpEL
expression to be evaluated at runtime against the request message.
By default the RSocketOutboundGateway.Command.requestResponse operation is used.
For a Publisher-based requests, it must be present in the request message payload.
The flattening via upstream FluxMessageChannel will work, too,
but this way we will lose a scope of particular request and every Publisher event
will be send in its own plain request.
If reply is a Flux, it is wrapped to the Mono to retain a request scope.
The downstream flow is responsible to obtain this Flux from a message payload
and subscribe to it by itself. The Mono reply from this component is subscribed from the downstream
FluxMessageChannel or it is adapted to the
ListenableFuture otherwise.
RSocketOutboundGateway.Command,
RSocketRequester| Modifier and Type | Class and Description |
|---|---|
static class |
RSocketOutboundGateway.Command
Enumeration of commands supported by the gateways.
|
AbstractReplyProducingMessageHandler.RequestHandlerIntegrationManagement.ManagementOverridesmessagingTemplateEXPRESSION_PARSER, loggerMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEHIGHEST_PRECEDENCE, LOWEST_PRECEDENCE| Constructor and Description |
|---|
RSocketOutboundGateway(Expression routeExpression)
Instantiate based on the provided SpEL expression to evaluate an RSocket endpoint
route
at runtime against a request message. |
RSocketOutboundGateway(String route)
Instantiate based on the provided RSocket endpoint
route. |
| Modifier and Type | Method and Description |
|---|---|
protected void |
doInit() |
protected Object |
handleRequestMessage(Message<?> requestMessage)
Subclasses must implement this method to handle the request Message.
|
void |
setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector)
Configure a
ClientRSocketConnector for client side requests based on the connection
provided by the ClientRSocketConnector.getRSocketRequester(). |
void |
setCommand(RSocketOutboundGateway.Command command)
Configure a
RSocketOutboundGateway.Command for RSocket request type. |
void |
setCommandExpression(Expression commandExpression)
Configure a SpEL expression to evaluate a
RSocketOutboundGateway.Command for RSocket request type at runtime
against a request message. |
void |
setExpectedResponseType(Class<?> expectedResponseType)
Specify the expected response type for the RSocket response.
|
void |
setExpectedResponseTypeExpression(Expression expectedResponseTypeExpression)
Specify the
Expression to determine the type for the RSocket response. |
void |
setPublisherElementType(Class<?> publisherElementType)
Configure a type for a request
Publisher elements. |
void |
setPublisherElementTypeExpression(Expression publisherElementTypeExpression)
Configure a SpEL expression to evaluate a request
Publisher elements type at runtime against
a request message. |
doInvokeAdvisedRequestHandler, getBeanClassLoader, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReplyaddNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeadersconfigureMetrics, destroy, getActiveCount, getActiveCountLong, getComponentType, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getOverrides, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, onComplete, onError, onNext, onSubscribe, registerMetricsCaptor, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabledafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waiterrorCount, handleCountgetBeanName, getComponentNamepublic RSocketOutboundGateway(String route)
route.route - the RSocket endpoint route to use.public RSocketOutboundGateway(Expression routeExpression)
route
at runtime against a request message.routeExpression - the SpEL expression to use.public void setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector)
ClientRSocketConnector for client side requests based on the connection
provided by the ClientRSocketConnector.getRSocketRequester().
In case of server side, an RSocketRequester must be provided in the
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER header of request message.clientRSocketConnector - the ClientRSocketConnector to use.public void setCommand(RSocketOutboundGateway.Command command)
RSocketOutboundGateway.Command for RSocket request type.command - the RSocketOutboundGateway.Command to use.public void setCommandExpression(Expression commandExpression)
RSocketOutboundGateway.Command for RSocket request type at runtime
against a request message.commandExpression - the SpEL expression to use.public void setPublisherElementType(Class<?> publisherElementType)
Publisher elements.publisherElementType - the type of the request Publisher elements.RSocketRequester.RequestSpec#data(Publisher, Class)public void setPublisherElementTypeExpression(Expression publisherElementTypeExpression)
Publisher elements type at runtime against
a request message.publisherElementTypeExpression - the expression to evaluate a type for the request
Publisher elements.RSocketRequester.RequestSpec#datapublic void setExpectedResponseType(Class<?> expectedResponseType)
expectedResponseType - The expected type.setExpectedResponseTypeExpression(Expression),
RSocketRequester.ResponseSpec#retrieveMono,
RSocketRequester.ResponseSpec#retrieveFluxpublic void setExpectedResponseTypeExpression(Expression expectedResponseTypeExpression)
Expression to determine the type for the RSocket response.expectedResponseTypeExpression - The expected response type expression.RSocketRequester.ResponseSpec#retrieveMono,
RSocketRequester.ResponseSpec#retrieveFluxprotected void doInit()
doInit in class AbstractReplyProducingMessageHandlerprotected Object handleRequestMessage(Message<?> requestMessage)
AbstractReplyProducingMessageHandlerhandleRequestMessage in class AbstractReplyProducingMessageHandlerrequestMessage - The request message.null.