package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.plugins.RequestInterceptor;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.1.jar:io/rsocket/core/RSocketRequester.class */
class RSocketRequester extends RequesterResponderSupport implements RSocket {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RSocketRequester.class);
    private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private volatile Throwable terminationError;
    private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR;

    @Nullable
    private final RequesterLeaseTracker requesterLeaseTracker;
    private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
    private final Sinks.Empty<Void> onClose;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketRequester(DuplexConnection duplexConnection, PayloadDecoder payloadDecoder, StreamIdSupplier streamIdSupplier, int i, int i2, int i3, int i4, int i5, @Nullable KeepAliveHandler keepAliveHandler, Function<RSocket, RequestInterceptor> function, @Nullable RequesterLeaseTracker requesterLeaseTracker) {
        super(i, i2, i3, payloadDecoder, duplexConnection, streamIdSupplier, function);
        this.requesterLeaseTracker = requesterLeaseTracker;
        this.onClose = Sinks.empty();
        duplexConnection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown);
        duplexConnection.receive().subscribe(this::handleIncomingFrames, th -> {
        });
        if (i4 == 0 || keepAliveHandler == null) {
            this.keepAliveFramesAcceptor = null;
        } else {
            this.keepAliveFramesAcceptor = keepAliveHandler.start(new KeepAliveSupport.ClientKeepAliveSupport(getAllocator(), i4, i5), byteBuf -> {
                duplexConnection.sendFrame(0, byteBuf);
            }, this::tryTerminateOnKeepAlive);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        return this.requesterLeaseTracker == null ? new FireAndForgetRequesterMono(payload, this) : new SlowFireAndForgetRequesterMono(payload, this);
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        return new RequestResponseRequesterMono(payload, this);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        return new RequestStreamRequesterFlux(payload, this);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return new RequestChannelRequesterFlux(publisher, this);
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        Throwable th = this.terminationError;
        if (th == null) {
            return new MetadataPushRequesterMono(payload, this);
        }
        payload.release();
        return Mono.error(th);
    }

    @Override // io.rsocket.core.RequesterResponderSupport
    public RequesterLeaseTracker getRequesterLeaseTracker() {
        return this.requesterLeaseTracker;
    }

    @Override // io.rsocket.core.RequesterResponderSupport
    public int getNextStreamId() {
        int nextStreamId = super.getNextStreamId();
        Throwable th = this.terminationError;
        if (th != null) {
            throw Exceptions.propagate(th);
        }
        return nextStreamId;
    }

    @Override // io.rsocket.core.RequesterResponderSupport
    public int addAndGetNextStreamId(FrameHandler frameHandler) {
        int addAndGetNextStreamId = super.addAndGetNextStreamId(frameHandler);
        Throwable th = this.terminationError;
        if (th == null) {
            return addAndGetNextStreamId;
        }
        super.remove(addAndGetNextStreamId, frameHandler);
        throw Exceptions.propagate(th);
    }

    @Override // io.rsocket.RSocket, io.rsocket.Availability
    public double availability() {
        RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
        return requesterLeaseTracker != null ? Math.min(getDuplexConnection().availability(), requesterLeaseTracker.availability()) : getDuplexConnection().availability();
    }

    @Override // io.rsocket.RSocket, reactor.core.Disposable
    public void dispose() {
        tryShutdown();
    }

    @Override // io.rsocket.RSocket, reactor.core.Disposable
    public boolean isDisposed() {
        return this.terminationError != null;
    }

    @Override // io.rsocket.RSocket, io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.onClose.asMono();
    }

    private void handleIncomingFrames(ByteBuf byteBuf) {
        try {
            int streamId = FrameHeaderCodec.streamId(byteBuf);
            FrameType frameType = FrameHeaderCodec.frameType(byteBuf);
            if (streamId == 0) {
                handleStreamZero(frameType, byteBuf);
            } else {
                handleFrame(streamId, frameType, byteBuf);
            }
        } catch (Throwable th) {
            LOGGER.error("Unexpected error during frame handling", th);
            getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Unexpected error during frame handling", th));
        }
    }

    private void handleStreamZero(FrameType frameType, ByteBuf byteBuf) {
        switch (frameType) {
            case ERROR:
                tryTerminateOnZeroError(byteBuf);
                return;
            case LEASE:
                this.requesterLeaseTracker.handleLeaseFrame(byteBuf);
                return;
            case KEEPALIVE:
                if (this.keepAliveFramesAcceptor != null) {
                    this.keepAliveFramesAcceptor.receive(byteBuf);
                    return;
                }
                return;
            default:
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Requester received unsupported frame on stream 0: " + byteBuf.toString());
                    return;
                }
                return;
        }
    }

    private void handleFrame(int i, FrameType frameType, ByteBuf byteBuf) {
        FrameHandler frameHandler = get(i);
        if (frameHandler == null) {
            handleMissingResponseProcessor(i, frameType, byteBuf);
            return;
        }
        switch (frameType) {
            case ERROR:
                frameHandler.handleError(io.rsocket.exceptions.Exceptions.from(i, byteBuf));
                return;
            case LEASE:
            case KEEPALIVE:
            default:
                throw new IllegalStateException("Requester received unsupported frame on stream " + i + ": " + byteBuf.toString());
            case NEXT_COMPLETE:
                frameHandler.handleNext(byteBuf, false, true);
                return;
            case NEXT:
                frameHandler.handleNext(byteBuf, FrameHeaderCodec.hasFollows(byteBuf), false);
                return;
            case COMPLETE:
                frameHandler.handleComplete();
                return;
            case CANCEL:
                frameHandler.handleCancel();
                return;
            case REQUEST_N:
                frameHandler.handleRequestN(RequestNFrameCodec.requestN(byteBuf));
                return;
        }
    }

    private void handleMissingResponseProcessor(int i, FrameType frameType, ByteBuf byteBuf) {
        if (this.streamIdSupplier.isBeforeOrCurrent(i)) {
            return;
        }
        if (frameType != FrameType.ERROR) {
            throw new IllegalStateException("Client received message for non-existent stream: " + i + ", frame type: " + frameType);
        }
        throw new IllegalStateException("Client received error for non-existent stream: " + i + " Message: " + ErrorFrameCodec.dataUtf8(byteBuf));
    }

    private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) {
        tryTerminate(() -> {
            return new ConnectionErrorException(String.format("No keep-alive acks for %d ms", Long.valueOf(keepAlive.getTimeout().toMillis())));
        });
    }

    private void tryTerminateOnConnectionError(Throwable th) {
        tryTerminate(() -> {
            return th;
        });
    }

    private void tryTerminateOnZeroError(ByteBuf byteBuf) {
        tryTerminate(() -> {
            return io.rsocket.exceptions.Exceptions.from(0, byteBuf);
        });
    }

    private void tryTerminate(Supplier<Throwable> supplier) {
        if (this.terminationError == null) {
            Throwable th = supplier.get();
            if (TERMINATION_ERROR.compareAndSet(this, null, th)) {
                terminate(th);
            }
        }
    }

    private void tryShutdown() {
        if (this.terminationError == null && TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) {
            terminate(CLOSED_CHANNEL_EXCEPTION);
        }
    }

    private void terminate(Throwable th) {
        if (this.keepAliveFramesAcceptor != null) {
            this.keepAliveFramesAcceptor.dispose();
        }
        getDuplexConnection().dispose();
        RequestInterceptor requestInterceptor = getRequestInterceptor();
        if (requestInterceptor != null) {
            requestInterceptor.dispose();
        }
        RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
        if (requesterLeaseTracker != null) {
            requesterLeaseTracker.dispose(th);
        }
        synchronized (this) {
            for (FrameHandler frameHandler : new ArrayList(this.activeStreams.values())) {
                if (frameHandler != null) {
                    try {
                        frameHandler.handleError(th);
                    } catch (Throwable th2) {
                    }
                }
            }
        }
        if (th == CLOSED_CHANNEL_EXCEPTION) {
            this.onClose.tryEmitEmpty();
        } else {
            this.onClose.tryEmitError(th);
        }
    }

    static {
        CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
        TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(RSocketRequester.class, Throwable.class, "terminationError");
    }
}
