/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.gateway.rsocket.core;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.rsocket.core.GatewayExchange;
import org.springframework.cloud.gateway.rsocket.core.GatewayFilterChain;
import org.springframework.cloud.gateway.rsocket.filter.RSocketFilter;
import org.springframework.cloud.gateway.rsocket.registry.Registry;
import org.springframework.cloud.gateway.rsocket.route.Route;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.util.function.Tuple2;

public class PendingRequestRSocket
extends AbstractRSocket
implements ResponderRSocket,
Consumer<Registry.RegisteredEvent> {
    private static final Log log = LogFactory.getLog(PendingRequestRSocket.class);
    private final Function<Registry.RegisteredEvent, Mono<Route>> routeFinder;
    private final Consumer<Metadata> metadataCallback;
    private final MonoProcessor<RSocket> rSocketProcessor;
    private Disposable subscriptionDisposable;
    private Route route;

    public PendingRequestRSocket(Function<Registry.RegisteredEvent, Mono<Route>> routeFinder, Consumer<Metadata> metadataCallback) {
        this(routeFinder, metadataCallback, (MonoProcessor<RSocket>)MonoProcessor.create());
    }

    PendingRequestRSocket(Function<Registry.RegisteredEvent, Mono<Route>> routeFinder, Consumer<Metadata> metadataCallback, MonoProcessor<RSocket> rSocketProcessor) {
        this.routeFinder = routeFinder;
        this.metadataCallback = metadataCallback;
        this.rSocketProcessor = rSocketProcessor;
    }

    @Override
    public void accept(Registry.RegisteredEvent registeredEvent) {
        this.routeFinder.apply(registeredEvent).subscribe(route -> {
            this.route = route;
            this.metadataCallback.accept(registeredEvent.getRoutingMetadata());
            this.rSocketProcessor.onNext((Object)registeredEvent.getRSocket());
            this.rSocketProcessor.onComplete();
            if (this.subscriptionDisposable != null) {
                this.subscriptionDisposable.dispose();
            }
        });
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return this.processor("pending-request-faf", payload).flatMap(tuple -> ((RSocket)tuple.getT1()).fireAndForget(payload));
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return this.processor("pending-request-rr", payload).flatMap(tuple -> ((RSocket)tuple.getT1()).requestResponse(payload));
    }

    public Flux<Payload> requestStream(Payload payload) {
        return this.processor("pending-request-rs", payload).flatMapMany(tuple -> ((RSocket)tuple.getT1()).requestStream(payload));
    }

    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
        return this.processor("pending-request-rc", payload).flatMapMany(tuple -> {
            RSocket rSocket = (RSocket)tuple.getT1();
            if (rSocket instanceof ResponderRSocket) {
                ResponderRSocket socket = (ResponderRSocket)rSocket;
                return socket.requestChannel(payload, payloads);
            }
            return rSocket.requestChannel(payloads);
        });
    }

    protected Mono<Tuple2<RSocket, RSocketFilter.Success>> processor(String logCategory, Payload payload) {
        return this.rSocketProcessor.log(PendingRequestRSocket.class.getName() + "." + logCategory, Level.FINEST, new SignalType[0]).flatMap(rSocket -> {
            GatewayExchange exchange = GatewayExchange.fromPayload(GatewayExchange.Type.REQUEST_STREAM, payload);
            exchange.getAttributes().put("__route_attr_", this.route);
            return Mono.just((Object)rSocket).zipWith(GatewayFilterChain.executeFilterChain(this.route.getFilters(), exchange));
        });
    }

    public void setSubscriptionDisposable(Disposable subscriptionDisposable) {
        this.subscriptionDisposable = subscriptionDisposable;
    }
}

