package org.springframework.integration.rsocket.outbound;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.class */
public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler {
    private final Expression routeExpression;
    private Object[] routeVars;

    @Nullable
    private ClientRSocketConnector clientRSocketConnector;
    private Expression interactionModelExpression;
    private Expression publisherElementTypeExpression;
    private Expression expectedResponseTypeExpression;
    private Expression metadataExpression;
    private EvaluationContext evaluationContext;

    @Nullable
    private RSocketRequester rsocketRequester;

    public RSocketOutboundGateway(String str, @Nullable Object... objArr) {
        this(new ValueExpression(str));
        if (objArr != null) {
            this.routeVars = Arrays.copyOf(objArr, objArr.length);
        }
    }

    public RSocketOutboundGateway(Expression expression) {
        this.interactionModelExpression = new ValueExpression(RSocketInteractionModel.requestResponse);
        this.expectedResponseTypeExpression = new ValueExpression(String.class);
        Assert.notNull(expression, "'routeExpression' must not be null");
        this.routeExpression = expression;
        setAsync(true);
        setPrimaryExpression(this.routeExpression);
    }

    public void setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector) {
        Assert.notNull(clientRSocketConnector, "'clientRSocketConnector' must not be null");
        this.clientRSocketConnector = clientRSocketConnector;
    }

    public void setInteractionModel(RSocketInteractionModel rSocketInteractionModel) {
        setInteractionModelExpression(new ValueExpression(rSocketInteractionModel));
    }

    public void setInteractionModelExpression(Expression expression) {
        Assert.notNull(expression, "'interactionModelExpression' must not be null");
        this.interactionModelExpression = expression;
    }

    public void setPublisherElementType(Class<?> cls) {
        setPublisherElementTypeExpression(new ValueExpression(cls));
    }

    public void setPublisherElementTypeExpression(Expression expression) {
        this.publisherElementTypeExpression = expression;
    }

    public void setExpectedResponseType(Class<?> cls) {
        setExpectedResponseTypeExpression(new ValueExpression(cls));
    }

    public void setExpectedResponseTypeExpression(Expression expression) {
        this.expectedResponseTypeExpression = expression;
    }

    public void setMetadataExpression(Expression expression) {
        this.metadataExpression = expression;
    }

    protected void doInit() {
        super.doInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        if (this.clientRSocketConnector != null) {
            this.rsocketRequester = this.clientRSocketConnector.getRequester();
        }
    }

    protected Object handleRequestMessage(Message<?> message) {
        RSocketRequester rSocketRequester = (RSocketRequester) message.getHeaders().get("rsocketRequester", RSocketRequester.class);
        if (rSocketRequester == null) {
            rSocketRequester = this.rsocketRequester;
        }
        Assert.notNull(rSocketRequester, () -> {
            return "The 'RSocketRequester' must be configured via 'ClientRSocketConnector' or provided in the 'rsocketRequester' request message headers.";
        });
        return Mono.just(rSocketRequester).map(rSocketRequester2 -> {
            return createRequestSpec(rSocketRequester2, message);
        }).map(requestSpec -> {
            return prepareRetrieveSpec(requestSpec, message);
        }).flatMap(retrieveSpec -> {
            return performRetrieve(retrieveSpec, message);
        });
    }

    private RSocketRequester.RequestSpec createRequestSpec(RSocketRequester rSocketRequester, Message<?> message) {
        String str = (String) this.routeExpression.getValue(this.evaluationContext, message, String.class);
        Assert.notNull(str, () -> {
            return "The 'routeExpression' [" + this.routeExpression + "] must not evaluate to null";
        });
        RSocketRequester.RequestSpec route = rSocketRequester.route(str, this.routeVars);
        if (this.metadataExpression != null) {
            Map map = (Map) this.metadataExpression.getValue(this.evaluationContext, message, Map.class);
            if (!CollectionUtils.isEmpty(map)) {
                route.metadata(metadataSpec -> {
                    Objects.requireNonNull(metadataSpec);
                    map.forEach(metadataSpec::metadata);
                });
            }
        }
        return route;
    }

    private RSocketRequester.RetrieveSpec prepareRetrieveSpec(RSocketRequester.RequestSpec requestSpec, Message<?> message) {
        Object payload = message.getPayload();
        if (!(payload instanceof Publisher) || this.publisherElementTypeExpression == null) {
            return requestSpec.data(payload);
        }
        return prepareRequestSpecForPublisher(requestSpec, (Publisher) payload, evaluateExpressionForType(message, this.publisherElementTypeExpression, "publisherElementType"));
    }

    private RSocketRequester.RetrieveSpec prepareRequestSpecForPublisher(RSocketRequester.RequestSpec requestSpec, Publisher<?> publisher, Object obj) {
        return obj instanceof Class ? requestSpec.data(publisher, (Class) obj) : requestSpec.data(publisher, (ParameterizedTypeReference) obj);
    }

    private Mono<?> performRetrieve(RSocketRequester.RetrieveSpec retrieveSpec, Message<?> message) {
        ResolvableType forType;
        Flux retrieveFlux;
        RSocketInteractionModel evaluateInteractionModel = evaluateInteractionModel(message);
        Assert.notNull(evaluateInteractionModel, () -> {
            return "The 'interactionModelExpression' [" + this.interactionModelExpression + "] must not evaluate to null";
        });
        Object obj = null;
        if (!RSocketInteractionModel.fireAndForget.equals(evaluateInteractionModel)) {
            obj = evaluateExpressionForType(message, this.expectedResponseTypeExpression, "expectedResponseType");
        }
        switch (evaluateInteractionModel) {
            case fireAndForget:
                return retrieveSpec.send();
            case requestResponse:
                return obj instanceof Class ? retrieveSpec.retrieveMono((Class) obj) : retrieveSpec.retrieveMono((ParameterizedTypeReference) obj);
            case requestStream:
            case requestChannel:
                if (obj instanceof Class) {
                    forType = ResolvableType.forClass((Class) obj);
                    retrieveFlux = retrieveSpec.retrieveFlux((Class) obj);
                } else {
                    forType = ResolvableType.forType((ParameterizedTypeReference) obj);
                    retrieveFlux = retrieveSpec.retrieveFlux((ParameterizedTypeReference) obj);
                }
                return isVoid(forType) ? retrieveFlux.then() : Mono.just(retrieveFlux);
            default:
                throw new UnsupportedOperationException("Unsupported interaction model: " + evaluateInteractionModel);
        }
    }

    private RSocketInteractionModel evaluateInteractionModel(Message<?> message) {
        Object value = this.interactionModelExpression.getValue(this.evaluationContext, message);
        if (value instanceof RSocketInteractionModel) {
            return (RSocketInteractionModel) value;
        }
        if (value instanceof String) {
            return RSocketInteractionModel.valueOf((String) value);
        }
        throw new IllegalStateException("The 'interactionModelExpression' [" + this.interactionModelExpression + "] must evaluate to 'RSocketInteractionModel' or 'String' type, but not into: '" + value + "'");
    }

    private Object evaluateExpressionForType(Message<?> message, Expression expression, String str) {
        Object value = expression.getValue(this.evaluationContext, message);
        Assert.state((value instanceof Class) || (value instanceof String) || (value instanceof ParameterizedTypeReference), () -> {
            return "The '" + str + "' [" + expression + "] must evaluate to 'String' (class FQN), 'Class<?>' or 'ParameterizedTypeReference<?>', not to: " + value;
        });
        if (!(value instanceof String)) {
            return value;
        }
        try {
            return ClassUtils.forName((String) value, getBeanClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

    private static boolean isVoid(ResolvableType resolvableType) {
        return Void.class.equals(resolvableType.resolve()) || Void.TYPE.equals(resolvableType.resolve());
    }
}
