package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.lease.LeaseStats;
import io.rsocket.lease.Leases;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.lease.ResponderLeaseHandler;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.resume.ClientRSocketSession;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0.jar:io/rsocket/core/RSocketConnector.class */
public class RSocketConnector {
    private static final String CLIENT_TAG = "client";
    private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION = (rSocket, invalidatable) -> {
        Mono<Void> onClose = rSocket.onClose();
        Consumer<? super Throwable> consumer = th -> {
            invalidatable.invalidate();
        };
        invalidatable.getClass();
        onClose.subscribe(null, consumer, invalidatable::invalidate);
    };

    @Nullable
    private SocketAcceptor acceptor;
    private Retry retrySpec;
    private Resume resume;
    private Supplier<Leases<?>> leasesSupplier;
    private Payload setupPayload = EmptyPayload.INSTANCE;
    private String metadataMimeType = "application/binary";
    private String dataMimeType = "application/binary";
    private Duration keepAliveInterval = Duration.ofSeconds(20);
    private Duration keepAliveMaxLifeTime = Duration.ofSeconds(90);
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private int mtu = 0;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

    private RSocketConnector() {
    }

    public static RSocketConnector create() {
        return new RSocketConnector();
    }

    public static Mono<RSocket> connectWith(ClientTransport clientTransport) {
        return create().connect(() -> {
            return clientTransport;
        });
    }

    public RSocketConnector setupPayload(Payload payload) {
        this.setupPayload = (Payload) Objects.requireNonNull(payload);
        return this;
    }

    public RSocketConnector dataMimeType(String str) {
        this.dataMimeType = (String) Objects.requireNonNull(str);
        return this;
    }

    public RSocketConnector metadataMimeType(String str) {
        this.metadataMimeType = (String) Objects.requireNonNull(str);
        return this;
    }

    public RSocketConnector keepAlive(Duration duration, Duration duration2) {
        if (!duration.negated().isNegative()) {
            throw new IllegalArgumentException("`interval` for keepAlive must be > 0");
        }
        if (!duration2.negated().isNegative()) {
            throw new IllegalArgumentException("`maxLifeTime` for keepAlive must be > 0");
        }
        this.keepAliveInterval = duration;
        this.keepAliveMaxLifeTime = duration2;
        return this;
    }

    public RSocketConnector interceptors(Consumer<InterceptorRegistry> consumer) {
        consumer.accept(this.interceptors);
        return this;
    }

    public RSocketConnector acceptor(SocketAcceptor socketAcceptor) {
        this.acceptor = socketAcceptor;
        return this;
    }

    public RSocketConnector reconnect(Retry retry) {
        this.retrySpec = (Retry) Objects.requireNonNull(retry);
        return this;
    }

    public RSocketConnector resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
        this.leasesSupplier = supplier;
        return this;
    }

    public RSocketConnector fragment(int i) {
        if ((i > 0 && i < 64) || i < 0) {
            throw new IllegalArgumentException(String.format("The smallest allowed mtu size is %d bytes, provided: %d", 64, Integer.valueOf(i)));
        }
        this.mtu = i;
        return this;
    }

    public RSocketConnector payloadDecoder(PayloadDecoder payloadDecoder) {
        Objects.requireNonNull(payloadDecoder);
        this.payloadDecoder = payloadDecoder;
        return this;
    }

    public Mono<RSocket> connect(ClientTransport clientTransport) {
        return connect(() -> {
            return clientTransport;
        });
    }

    public Mono<RSocket> connect(Supplier<ClientTransport> supplier) {
        Mono flatMap = Mono.fromSupplier(supplier).flatMap(clientTransport -> {
            return clientTransport.connect(this.mtu);
        });
        return (Mono) flatMap.flatMap(duplexConnection -> {
            ByteBuf byteBuf;
            KeepAliveHandler defaultKeepAliveHandler;
            DuplexConnection duplexConnection;
            if (this.resume != null) {
                byteBuf = this.resume.getTokenSupplier().get();
                ClientRSocketSession resumeToken = new ClientRSocketSession(duplexConnection, this.resume.getSessionDuration(), this.resume.getRetry(), this.resume.getStoreFactory(CLIENT_TAG).apply(byteBuf), this.resume.getStreamTimeout(), this.resume.isCleanupStoreOnKeepAlive()).continueWith((Mono<DuplexConnection>) flatMap).resumeToken(byteBuf);
                defaultKeepAliveHandler = new KeepAliveHandler.ResumableKeepAliveHandler(resumeToken.resumableConnection());
                duplexConnection = resumeToken.resumableConnection();
            } else {
                byteBuf = Unpooled.EMPTY_BUFFER;
                defaultKeepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(duplexConnection);
                duplexConnection = duplexConnection;
            }
            ClientServerInputMultiplexer clientServerInputMultiplexer = new ClientServerInputMultiplexer(duplexConnection, this.interceptors, true);
            boolean z = this.leasesSupplier != null;
            Leases<?> leases = z ? this.leasesSupplier.get() : null;
            RSocket initRequester = this.interceptors.initRequester(new RSocketRequester(clientServerInputMultiplexer.asClientConnection(), this.payloadDecoder, StreamIdSupplier.clientSupplier(), this.mtu, (int) this.keepAliveInterval.toMillis(), (int) this.keepAliveMaxLifeTime.toMillis(), defaultKeepAliveHandler, z ? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver()) : RequesterLeaseHandler.None, Schedulers.single(Schedulers.parallel())));
            ByteBuf encode = SetupFrameCodec.encode(duplexConnection.alloc(), z, (int) this.keepAliveInterval.toMillis(), (int) this.keepAliveMaxLifeTime.toMillis(), byteBuf, this.metadataMimeType, this.dataMimeType, this.setupPayload);
            DuplexConnection duplexConnection2 = duplexConnection;
            return this.interceptors.initSocketAcceptor(this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() { // from class: io.rsocket.core.RSocketConnector.1
            })).accept(new DefaultConnectionSetupPayload(encode), initRequester).flatMap(rSocket -> {
                new RSocketResponder(clientServerInputMultiplexer.asServerConnection(), this.interceptors.initResponder(rSocket), this.payloadDecoder, z ? new ResponderLeaseHandler.Impl(CLIENT_TAG, duplexConnection2.alloc(), leases.sender(), leases.stats()) : ResponderLeaseHandler.None, this.mtu);
                return duplexConnection2.sendOne(encode).thenReturn(initRequester);
            });
        }).as(mono -> {
            return this.retrySpec != null ? new ReconnectMono(mono.retryWhen(this.retrySpec), (v0) -> {
                v0.dispose();
            }, INVALIDATE_FUNCTION) : mono;
        });
    }
}
