package org.springframework.cloud.function.stream;

import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.class */
public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
    private final FunctionCatalog functionCatalog;

    public SupplierInvokingMessageProducer(FunctionCatalog functionCatalog) {
        this.functionCatalog = functionCatalog;
        setOutputChannelName("output");
    }

    protected void doStart() {
        supplier().subscribe(message -> {
            sendMessage(message);
        });
    }

    private Flux<Message<?>> supplier() {
        Flux empty = Flux.empty();
        for (String str : this.functionCatalog.getSupplierNames()) {
            Supplier lookupSupplier = this.functionCatalog.lookupSupplier(str);
            Assert.notNull(lookupSupplier, "Supplier must not be null");
            empty = Flux.merge(new Publisher[]{empty, ((Flux) lookupSupplier.get()).map(obj -> {
                return MessageBuilder.withPayload(obj).setHeader(StreamConfigurationProperties.ROUTE_KEY, str).build();
            })});
        }
        return empty;
    }
}
