package org.springframework.cloud.function.context.config;

import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:org/springframework/cloud/function/context/config/RoutingFunction.class */
public class RoutingFunction implements Function<Publisher<Message<?>>, Publisher<?>> {
    public static final String FUNCTION_NAME = "router";
    private final FunctionCatalog functionCatalog;
    private final FunctionInspector functionInspector;
    private final MessageConverter messageConverter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RoutingFunction(FunctionCatalog functionCatalog, FunctionInspector functionInspector, MessageConverter messageConverter) {
        this.functionCatalog = functionCatalog;
        this.functionInspector = functionInspector;
        this.messageConverter = messageConverter;
    }

    @Override // java.util.function.Function
    public Publisher<?> apply(Publisher<Message<?>> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            Assert.isTrue(signal.hasValue() && signal.getType() == SignalType.ON_NEXT, "Signal has no value or wrong type " + signal);
            Function routeToFunction = getRouteToFunction((Message) signal.get());
            return flux.map(message -> {
                return convertInput(message, routeToFunction);
            }).log().doOnError(th -> {
                throw new IllegalStateException("Failed to convert Message. Possible reason; No suitable converter was found for payload with 'contentType' " + ((Message) signal.get()).getHeaders().get("contentType"), th);
            }).transform(routeToFunction);
        });
    }

    private Function getRouteToFunction(Message<?> message) {
        String str = (String) message.getHeaders().get("function.name");
        Assert.hasText(str, "A 'function.name' was not provided as message header.");
        Function function = (Function) this.functionCatalog.lookup(str);
        Assert.notNull(function, "Failed to locate function specified with 'function.name':" + message.getHeaders().get("function.name"));
        return function;
    }

    private Object convertInput(Message<?> message, Object obj) {
        Class<?> inputType = this.functionInspector.getInputType(obj);
        Object payload = message.getPayload();
        if (!payload.getClass().isAssignableFrom(inputType)) {
            payload = this.messageConverter.fromMessage(message, this.functionInspector.getInputType(obj));
        }
        if (this.functionInspector.isMessage(obj)) {
            payload = MessageBuilder.createMessage(payload, message.getHeaders());
        }
        Assert.notNull(payload, "Failed to determine input value of type " + inputType + " from Message '" + message + "'. No suitable Message Converter found.");
        return payload;
    }
}
