package org.springframework.cloud.gateway.rsocket.core;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.rsocket.common.metadata.TagsMetadata;
import org.springframework.cloud.gateway.rsocket.core.GatewayExchange;
import org.springframework.cloud.gateway.rsocket.filter.RSocketFilter;
import org.springframework.cloud.gateway.rsocket.route.Route;
import org.springframework.cloud.gateway.rsocket.routing.RoutingTable;
import org.springframework.messaging.rsocket.MetadataExtractor;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.util.function.Tuple2;

/* loaded from: input_file:org/springframework/cloud/gateway/rsocket/core/PendingRequestRSocket.class */
public class PendingRequestRSocket extends AbstractRSocket implements ResponderRSocket, Consumer<RoutingTable.RegisteredEvent> {
    private static final Log log = LogFactory.getLog(PendingRequestRSocket.class);
    private final Function<RoutingTable.RegisteredEvent, Mono<Route>> routeFinder;
    private final MetadataExtractor metadataExtractor;
    private final Consumer<TagsMetadata> metadataCallback;
    private final MonoProcessor<RSocket> rSocketProcessor;
    private Disposable subscriptionDisposable;
    private Route route;

    public PendingRequestRSocket(MetadataExtractor metadataExtractor, Function<RoutingTable.RegisteredEvent, Mono<Route>> function, Consumer<TagsMetadata> consumer) {
        this(metadataExtractor, function, consumer, MonoProcessor.create());
    }

    PendingRequestRSocket(MetadataExtractor metadataExtractor, Function<RoutingTable.RegisteredEvent, Mono<Route>> function, Consumer<TagsMetadata> consumer, MonoProcessor<RSocket> monoProcessor) {
        this.routeFinder = function;
        this.metadataExtractor = metadataExtractor;
        this.metadataCallback = consumer;
        this.rSocketProcessor = monoProcessor;
    }

    @Override // java.util.function.Consumer
    public void accept(RoutingTable.RegisteredEvent registeredEvent) {
        this.routeFinder.apply(registeredEvent).subscribe(route -> {
            this.route = route;
            this.metadataCallback.accept(registeredEvent.getRoutingMetadata());
            this.rSocketProcessor.onNext(registeredEvent.getRSocket());
            this.rSocketProcessor.onComplete();
            if (this.subscriptionDisposable != null) {
                this.subscriptionDisposable.dispose();
            }
        });
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return processor("pending-request-faf", payload).flatMap(tuple2 -> {
            return ((RSocket) tuple2.getT1()).fireAndForget(payload);
        });
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return processor("pending-request-rr", payload).flatMap(tuple2 -> {
            return ((RSocket) tuple2.getT1()).requestResponse(payload);
        });
    }

    public Flux<Payload> requestStream(Payload payload) {
        return processor("pending-request-rs", payload).flatMapMany(tuple2 -> {
            return ((RSocket) tuple2.getT1()).requestStream(payload);
        });
    }

    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
        return processor("pending-request-rc", payload).flatMapMany(tuple2 -> {
            ResponderRSocket responderRSocket = (RSocket) tuple2.getT1();
            return responderRSocket instanceof ResponderRSocket ? responderRSocket.requestChannel(payload, publisher) : responderRSocket.requestChannel(publisher);
        });
    }

    protected Mono<Tuple2<RSocket, RSocketFilter.Success>> processor(String str, Payload payload) {
        return this.rSocketProcessor.log(PendingRequestRSocket.class.getName() + "." + str, Level.FINEST, new SignalType[0]).flatMap(rSocket -> {
            GatewayExchange fromPayload = GatewayExchange.fromPayload(GatewayExchange.Type.REQUEST_STREAM, payload, this.metadataExtractor);
            fromPayload.getAttributes().put(GatewayExchange.ROUTE_ATTR, this.route);
            return Mono.just(rSocket).zipWith(GatewayFilterChain.executeFilterChain(this.route.getFilters(), fromPayload));
        });
    }

    public void setSubscriptionDisposable(Disposable disposable) {
        this.subscriptionDisposable = disposable;
    }
}
