package io.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.CancelFrameFlyweight;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameFlyweight;
import io.rsocket.frame.RequestChannelFrameFlyweight;
import io.rsocket.frame.RequestNFrameFlyweight;
import io.rsocket.frame.RequestStreamFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.RateLimitableRequestPublisher;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
import java.util.function.Consumer;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC5.jar:io/rsocket/RSocketResponder.class */
public class RSocketResponder implements ResponderRSocket {
    private final DuplexConnection connection;
    private final RSocket requestHandler;
    private final ResponderRSocket responderRSocket;
    private final PayloadDecoder payloadDecoder;
    private final Consumer<Throwable> errorConsumer;
    private final ResponderLeaseHandler leaseHandler;
    private final IntObjectMap<RateLimitableRequestPublisher> sendingLimitableSubscriptions;
    private final IntObjectMap<Subscription> sendingSubscriptions;
    private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;
    private final UnboundedProcessor<ByteBuf> sendProcessor;
    private final ByteBufAllocator allocator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketResponder(ByteBufAllocator byteBufAllocator, DuplexConnection duplexConnection, RSocket rSocket, PayloadDecoder payloadDecoder, Consumer<Throwable> consumer, ResponderLeaseHandler responderLeaseHandler) {
        this.allocator = byteBufAllocator;
        this.connection = duplexConnection;
        this.requestHandler = rSocket;
        this.responderRSocket = rSocket instanceof ResponderRSocket ? (ResponderRSocket) rSocket : null;
        this.payloadDecoder = payloadDecoder;
        this.errorConsumer = consumer;
        this.leaseHandler = responderLeaseHandler;
        this.sendingLimitableSubscriptions = new SynchronizedIntObjectHashMap();
        this.sendingSubscriptions = new SynchronizedIntObjectHashMap();
        this.channelProcessors = new SynchronizedIntObjectHashMap();
        this.sendProcessor = new UnboundedProcessor<>();
        duplexConnection.send(this.sendProcessor).doFinally(this::handleSendProcessorCancel).subscribe(null, this::handleSendProcessorError);
        Disposable subscribe = duplexConnection.receive().subscribe(this::handleFrame, consumer);
        UnboundedProcessor<ByteBuf> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        Disposable send = responderLeaseHandler.send((v1) -> {
            r1.onNext(v1);
        });
        this.connection.onClose().doFinally(signalType -> {
            cleanup();
            subscribe.dispose();
            send.dispose();
        }).subscribe(null, consumer);
    }

    private void handleSendProcessorError(Throwable th) {
        this.sendingSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        });
        this.sendingLimitableSubscriptions.values().forEach(rateLimitableRequestPublisher -> {
            try {
                rateLimitableRequestPublisher.cancel();
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        });
        this.channelProcessors.values().forEach(processor -> {
            try {
                processor.onError(th);
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        });
    }

    private void handleSendProcessorCancel(SignalType signalType) {
        if (SignalType.ON_ERROR == signalType) {
            return;
        }
        this.sendingSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        });
        this.sendingLimitableSubscriptions.values().forEach(rateLimitableRequestPublisher -> {
            try {
                rateLimitableRequestPublisher.cancel();
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        });
        this.channelProcessors.values().forEach(processor -> {
            try {
                processor.onComplete();
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        });
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.fireAndForget(payload);
            }
            payload.release();
            return Mono.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.requestResponse(payload);
            }
            payload.release();
            return Mono.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.requestStream(payload);
            }
            payload.release();
            return Flux.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            return this.leaseHandler.useLease() ? this.requestHandler.requestChannel(publisher) : Flux.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.ResponderRSocket
    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.responderRSocket.requestChannel(payload, publisher);
            }
            payload.release();
            return Flux.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.connection.dispose();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private void cleanup() {
        cleanUpSendingSubscriptions();
        cleanUpChannelProcessors();
        this.requestHandler.dispose();
        this.sendProcessor.dispose();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.sendingSubscriptions.clear();
        this.sendingLimitableSubscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.sendingLimitableSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors() {
        this.channelProcessors.values().forEach((v0) -> {
            v0.onComplete();
        });
        this.channelProcessors.clear();
    }

    private void handleFrame(ByteBuf byteBuf) {
        try {
            int streamId = FrameHeaderFlyweight.streamId(byteBuf);
            FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
            switch (frameType) {
                case REQUEST_FNF:
                    handleFireAndForget(streamId, fireAndForget(this.payloadDecoder.apply(byteBuf)));
                    break;
                case REQUEST_RESPONSE:
                    handleRequestResponse(streamId, requestResponse(this.payloadDecoder.apply(byteBuf)));
                    break;
                case CANCEL:
                    handleCancelFrame(streamId);
                    break;
                case REQUEST_N:
                    handleRequestN(streamId, byteBuf);
                    break;
                case REQUEST_STREAM:
                    handleStream(streamId, requestStream(this.payloadDecoder.apply(byteBuf)), RequestStreamFrameFlyweight.initialRequestN(byteBuf));
                    break;
                case REQUEST_CHANNEL:
                    handleChannel(streamId, this.payloadDecoder.apply(byteBuf), RequestChannelFrameFlyweight.initialRequestN(byteBuf));
                    break;
                case METADATA_PUSH:
                    handleMetadataPush(metadataPush(this.payloadDecoder.apply(byteBuf)));
                    break;
                case PAYLOAD:
                    break;
                case NEXT:
                    Processor<Payload, Payload> processor = this.channelProcessors.get(streamId);
                    if (processor != null) {
                        processor.onNext(this.payloadDecoder.apply(byteBuf));
                        break;
                    }
                    break;
                case COMPLETE:
                    Processor<Payload, Payload> processor2 = this.channelProcessors.get(streamId);
                    if (processor2 != null) {
                        processor2.onComplete();
                        break;
                    }
                    break;
                case ERROR:
                    Processor<Payload, Payload> processor3 = this.channelProcessors.get(streamId);
                    if (processor3 != null) {
                        processor3.onError(new ApplicationErrorException(ErrorFrameFlyweight.dataUtf8(byteBuf)));
                        break;
                    }
                    break;
                case NEXT_COMPLETE:
                    Processor<Payload, Payload> processor4 = this.channelProcessors.get(streamId);
                    if (processor4 != null) {
                        processor4.onNext(this.payloadDecoder.apply(byteBuf));
                        processor4.onComplete();
                        break;
                    }
                    break;
                case SETUP:
                    handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                    break;
                case LEASE:
                default:
                    handleError(streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + frameType));
                    break;
            }
            ReferenceCountUtil.safeRelease(byteBuf);
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(byteBuf);
            throw Exceptions.propagate(th);
        }
    }

    private void handleFireAndForget(final int i, Mono<Void> mono) {
        mono.subscribe((CoreSubscriber<? super Void>) new BaseSubscriber<Void>() { // from class: io.rsocket.RSocketResponder.1
            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnSubscribe(Subscription subscription) {
                RSocketResponder.this.sendingSubscriptions.put(i, (int) subscription);
                subscription.request(Long.MAX_VALUE);
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnError(Throwable th) {
                RSocketResponder.this.errorConsumer.accept(th);
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookFinally(SignalType signalType) {
                RSocketResponder.this.sendingSubscriptions.remove(i);
            }
        });
    }

    private void handleRequestResponse(final int i, Mono<Payload> mono) {
        mono.subscribe((CoreSubscriber<? super Payload>) new BaseSubscriber<Payload>() { // from class: io.rsocket.RSocketResponder.2
            private boolean isEmpty = true;

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnSubscribe(Subscription subscription) {
                RSocketResponder.this.sendingSubscriptions.put(i, (int) subscription);
                subscription.request(Long.MAX_VALUE);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // reactor.core.publisher.BaseSubscriber
            public void hookOnNext(Payload payload) {
                if (this.isEmpty) {
                    this.isEmpty = false;
                }
                try {
                    ByteBuf encodeNextComplete = PayloadFrameFlyweight.encodeNextComplete(RSocketResponder.this.allocator, i, payload);
                    payload.release();
                    RSocketResponder.this.sendProcessor.onNext(encodeNextComplete);
                } catch (Throwable th) {
                    payload.release();
                    throw Exceptions.propagate(th);
                }
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnError(Throwable th) {
                RSocketResponder.this.handleError(i, th);
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnComplete() {
                if (this.isEmpty) {
                    RSocketResponder.this.sendProcessor.onNext(PayloadFrameFlyweight.encodeComplete(RSocketResponder.this.allocator, i));
                }
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookFinally(SignalType signalType) {
                RSocketResponder.this.sendingSubscriptions.remove(i);
            }
        });
    }

    private void handleStream(final int i, Flux<Payload> flux, int i2) {
        flux.transform(flux2 -> {
            RateLimitableRequestPublisher wrap = RateLimitableRequestPublisher.wrap(flux2, Queues.SMALL_BUFFER_SIZE);
            this.sendingLimitableSubscriptions.put(i, (int) wrap);
            wrap.request(i2 >= Integer.MAX_VALUE ? Long.MAX_VALUE : i2);
            return wrap;
        }).subscribe((CoreSubscriber<? super V>) new BaseSubscriber<Payload>() { // from class: io.rsocket.RSocketResponder.3
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // reactor.core.publisher.BaseSubscriber
            public void hookOnNext(Payload payload) {
                try {
                    ByteBuf encodeNext = PayloadFrameFlyweight.encodeNext(RSocketResponder.this.allocator, i, payload);
                    payload.release();
                    RSocketResponder.this.sendProcessor.onNext(encodeNext);
                } catch (Throwable th) {
                    payload.release();
                    throw Exceptions.propagate(th);
                }
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnComplete() {
                RSocketResponder.this.sendProcessor.onNext(PayloadFrameFlyweight.encodeComplete(RSocketResponder.this.allocator, i));
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnError(Throwable th) {
                RSocketResponder.this.handleError(i, th);
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookFinally(SignalType signalType) {
                RSocketResponder.this.sendingLimitableSubscriptions.remove(i);
            }
        });
    }

    private void handleChannel(int i, Payload payload, int i2) {
        UnicastProcessor create = UnicastProcessor.create();
        this.channelProcessors.put(i, (int) create);
        Flux doFinally = create.doOnCancel(() -> {
            this.sendProcessor.onNext(CancelFrameFlyweight.encode(this.allocator, i));
        }).doOnError(th -> {
            handleError(i, th);
        }).doOnRequest(j -> {
            this.sendProcessor.onNext(RequestNFrameFlyweight.encode(this.allocator, i, j));
        }).doFinally(signalType -> {
            this.channelProcessors.remove(i);
        });
        create.onNext(payload);
        if (this.responderRSocket != null) {
            handleStream(i, requestChannel(payload, doFinally), i2);
        } else {
            handleStream(i, requestChannel(doFinally), i2);
        }
    }

    private void handleMetadataPush(Mono<Void> mono) {
        mono.subscribe((CoreSubscriber<? super Void>) new BaseSubscriber<Void>() { // from class: io.rsocket.RSocketResponder.4
            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            @Override // reactor.core.publisher.BaseSubscriber
            protected void hookOnError(Throwable th) {
                RSocketResponder.this.errorConsumer.accept(th);
            }
        });
    }

    private void handleCancelFrame(int i) {
        Subscription remove = this.sendingSubscriptions.remove(i);
        if (remove == null) {
            remove = this.sendingLimitableSubscriptions.remove(i);
        }
        if (remove != null) {
            remove.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(int i, Throwable th) {
        this.errorConsumer.accept(th);
        this.sendProcessor.onNext(ErrorFrameFlyweight.encode(this.allocator, i, th));
    }

    private void handleRequestN(int i, ByteBuf byteBuf) {
        Subscription subscription = this.sendingSubscriptions.get(i);
        if (subscription == null) {
            subscription = this.sendingLimitableSubscriptions.get(i);
        }
        if (subscription != null) {
            int requestN = RequestNFrameFlyweight.requestN(byteBuf);
            subscription.request(requestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : requestN);
        }
    }
}
