package org.springframework.cloud.function.web.source;

import java.net.ConnectException;
import java.net.URI;
import java.util.Collections;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.SmartLifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.messaging.Message;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/cloud/function/web/source/SupplierExporter.class */
public class SupplierExporter implements SmartLifecycle {
    private static Log logger = LogFactory.getLog(SupplierExporter.class);
    private final FunctionCatalog catalog;
    private final WebClient client;
    private final DestinationResolver destinationResolver;
    private final RequestBuilder requestBuilder;
    private final String supplier;
    private volatile boolean running;
    private volatile boolean ok = true;
    private boolean autoStartup;
    private boolean debug;
    private volatile Disposable subscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SupplierExporter(RequestBuilder requestBuilder, DestinationResolver destinationResolver, FunctionCatalog functionCatalog, WebClient webClient, ExporterProperties exporterProperties) {
        this.autoStartup = true;
        this.debug = true;
        this.requestBuilder = requestBuilder;
        this.destinationResolver = destinationResolver;
        this.catalog = functionCatalog;
        this.client = webClient;
        this.debug = exporterProperties.isDebug();
        this.autoStartup = exporterProperties.isAutoStartup();
        this.supplier = exporterProperties.getSink().getName();
    }

    public void start() {
        if (this.running) {
            return;
        }
        logger.info("Starting");
        Flux empty = Flux.empty();
        for (String str : this.supplier == null ? this.catalog.getNames(Supplier.class) : Collections.singleton(this.supplier)) {
            Supplier<Publisher<Object>> supplier = (Supplier) this.catalog.lookup(Supplier.class, str);
            if (supplier == null) {
                logger.warn("No such Supplier: " + str);
            } else {
                empty = empty.mergeWith(forward(supplier, str));
            }
        }
        this.subscription = empty.retry(th -> {
            boolean z = (th instanceof ConnectException) || ((th instanceof ClassCastException) && this.running);
            if (!z) {
                this.ok = false;
                if (!this.debug) {
                    logger.info(th);
                }
                stop();
            }
            return z;
        }).doOnComplete(() -> {
            stop();
        }).subscribe();
        this.ok = true;
        this.running = true;
    }

    public boolean isOk() {
        return this.ok;
    }

    public void stop() {
        logger.info("Stopping");
        this.running = false;
        this.subscription.dispose();
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    private Flux<ClientResponse> forward(Supplier<Publisher<Object>> supplier, String str) {
        return Flux.from(supplier.get()).flatMap(obj -> {
            String destination = this.destinationResolver.destination(supplier, str, obj);
            if (this.debug) {
                logger.info("Posting to: " + destination);
            }
            return post(uri(destination), destination, obj);
        });
    }

    private Mono<ClientResponse> post(URI uri, String str, Object obj) {
        Object obj2 = obj;
        if (obj instanceof Message) {
            obj2 = ((Message) obj).getPayload();
        }
        Mono<ClientResponse> exchange = this.client.post().uri(uri).headers(httpHeaders -> {
            headers(httpHeaders, str, obj);
        }).syncBody(obj2).exchange();
        if (this.debug) {
            exchange = exchange.log();
        }
        return exchange;
    }

    private void headers(HttpHeaders httpHeaders, String str, Object obj) {
        httpHeaders.putAll(this.requestBuilder.headers(str, obj));
    }

    private URI uri(String str) {
        return this.requestBuilder.uri(str);
    }
}
