package reactor.net.netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.composable.spec.Streams;
import reactor.function.Consumer;
import reactor.function.Supplier;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.Reconnect;
import reactor.net.config.ClientSocketOptions;
import reactor.net.config.SslOptions;
import reactor.net.netty.NettyClientSocketOptions;
import reactor.net.netty.NettyEventLoopDispatcher;
import reactor.net.netty.NettyNetChannel;
import reactor.net.netty.NettyNetChannelInboundHandler;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.ssl.SSLEngineSupplier;
import reactor.support.NamedDaemonThreadFactory;
import reactor.tuple.Tuple2;

/* loaded from: input_file:reactor/net/netty/tcp/NettyTcpClient.class */
public class NettyTcpClient<IN, OUT> extends TcpClient<IN, OUT> {
    private final Logger log;
    private final Bootstrap bootstrap;
    private final EventLoopGroup ioGroup;
    private final Supplier<ChannelFuture> connectionSupplier;
    private volatile InetSocketAddress connectAddress;
    private volatile boolean closing;
    private volatile SocketChannel channel;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: reactor.net.netty.tcp.NettyTcpClient$4, reason: invalid class name */
    /* loaded from: input_file:reactor/net/netty/tcp/NettyTcpClient$4.class */
    public class AnonymousClass4 implements ChannelFutureListener {
        final AtomicInteger attempts = new AtomicInteger(0);
        final ChannelFutureListener self = this;
        final /* synthetic */ Reconnect val$reconnect;
        final /* synthetic */ Deferred val$connections;

        AnonymousClass4(Reconnect reconnect, Deferred deferred) {
            this.val$reconnect = reconnect;
            this.val$connections = deferred;
        }

        public void operationComplete(final ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                if (NettyTcpClient.this.log.isInfoEnabled()) {
                    NettyTcpClient.this.log.info("CONNECT: " + channelFuture.channel());
                }
                final NettyNetChannel nettyNetChannel = (NettyNetChannel) NettyTcpClient.this.select(channelFuture.channel());
                channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { // from class: reactor.net.netty.tcp.NettyTcpClient.4.3
                    public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                        if (NettyTcpClient.this.log.isInfoEnabled()) {
                            NettyTcpClient.this.log.info("CLOSED: " + channelFuture2.channel());
                        }
                        NettyTcpClient.this.notifyClose(nettyNetChannel);
                        if (!nettyNetChannel.isClosing()) {
                            if (null != AnonymousClass4.this.val$reconnect.reconnect(NettyTcpClient.this.connectAddress, AnonymousClass4.this.attempts.incrementAndGet())) {
                                NettyTcpClient.this.createConnection(AnonymousClass4.this.self);
                            }
                        }
                        NettyTcpClient.this.getChannels().unregister(channelFuture2.channel());
                    }
                });
                channelFuture.channel().eventLoop().execute(new Runnable() { // from class: reactor.net.netty.tcp.NettyTcpClient.4.4
                    @Override // java.lang.Runnable
                    public void run() {
                        AnonymousClass4.this.val$connections.accept(nettyNetChannel);
                    }
                });
                return;
            }
            int incrementAndGet = this.attempts.incrementAndGet();
            Tuple2<InetSocketAddress, Long> reconnect = this.val$reconnect.reconnect(NettyTcpClient.this.connectAddress, incrementAndGet);
            if (null == reconnect) {
                if (NettyTcpClient.this.log.isErrorEnabled()) {
                    NettyTcpClient.this.log.error("Reconnection to {} failed after {} attempts.", NettyTcpClient.this.connectAddress, Integer.valueOf(incrementAndGet - 1));
                }
                channelFuture.channel().eventLoop().execute(new Runnable() { // from class: reactor.net.netty.tcp.NettyTcpClient.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        AnonymousClass4.this.val$connections.accept(channelFuture.cause());
                    }
                });
                return;
            }
            NettyTcpClient.this.connectAddress = (InetSocketAddress) reconnect.getT1();
            NettyTcpClient.this.bootstrap.remoteAddress(NettyTcpClient.this.connectAddress);
            long longValue = ((Long) reconnect.getT2()).longValue();
            if (NettyTcpClient.this.log.isInfoEnabled()) {
                NettyTcpClient.this.log.info("Attempting to reconnect to {} after {}ms", NettyTcpClient.this.connectAddress, Long.valueOf(longValue));
            }
            NettyTcpClient.this.getEnvironment().getRootTimer().submit(new Consumer<Long>() { // from class: reactor.net.netty.tcp.NettyTcpClient.4.2
                public void accept(Long l) {
                    NettyTcpClient.this.createConnection(AnonymousClass4.this.self);
                }
            }, longValue, TimeUnit.MILLISECONDS);
        }
    }

    public NettyTcpClient(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nonnull InetSocketAddress inetSocketAddress, @Nonnull final ClientSocketOptions clientSocketOptions, @Nullable final SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> collection) {
        super(environment, reactor2, inetSocketAddress, clientSocketOptions, sslOptions, codec, collection);
        this.log = LoggerFactory.getLogger(NettyTcpClient.class);
        this.ioGroup = new NioEventLoopGroup(((Integer) environment.getProperty("reactor.tcp.ioThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS))).intValue(), new NamedDaemonThreadFactory("reactor-tcp-io"));
        this.bootstrap = new Bootstrap().group(this.ioGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_RCVBUF, Integer.valueOf(clientSocketOptions.rcvbuf())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(clientSocketOptions.sndbuf())).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(clientSocketOptions.keepAlive())).option(ChannelOption.SO_LINGER, Integer.valueOf(clientSocketOptions.linger())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(clientSocketOptions.tcpNoDelay())).remoteAddress(this.connectAddress).handler(new ChannelInitializer<SocketChannel>() { // from class: reactor.net.netty.tcp.NettyTcpClient.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.config().setConnectTimeoutMillis(clientSocketOptions.timeout());
                if (null != sslOptions) {
                    SSLEngine m5get = new SSLEngineSupplier(sslOptions, true).m5get();
                    NettyTcpClient.this.log.debug("SSL enabled using keystore {}", null != sslOptions.keystoreFile() ? sslOptions.keystoreFile() : "<DEFAULT>");
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SslHandler(m5get)});
                }
                if ((clientSocketOptions instanceof NettyClientSocketOptions) && null != ((NettyClientSocketOptions) clientSocketOptions).pipelineConfigurer()) {
                    ((NettyClientSocketOptions) clientSocketOptions).pipelineConfigurer().accept(socketChannel.pipeline());
                }
                socketChannel.pipeline().addLast(NettyTcpClient.this.createChannelHandlers(socketChannel));
                NettyTcpClient.this.channel = socketChannel;
            }
        });
        this.connectionSupplier = new Supplier<ChannelFuture>() { // from class: reactor.net.netty.tcp.NettyTcpClient.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public ChannelFuture m1get() {
                if (NettyTcpClient.this.closing) {
                    return null;
                }
                return NettyTcpClient.this.bootstrap.connect(NettyTcpClient.this.getConnectAddress());
            }
        };
    }

    @Override // reactor.net.tcp.TcpClient, reactor.net.NetClient
    public Promise<NetChannel<IN, OUT>> open() {
        Deferred<NetChannel<IN, OUT>, Promise<NetChannel<IN, OUT>>> defer = Promises.defer(getEnvironment(), getReactor().getDispatcher());
        createConnection(createConnectListener(defer));
        return defer.compose();
    }

    @Override // reactor.net.tcp.TcpClient, reactor.net.NetClient
    public Stream<NetChannel<IN, OUT>> open(Reconnect reconnect) {
        Deferred<NetChannel<IN, OUT>, Stream<NetChannel<IN, OUT>>> defer = Streams.defer(getEnvironment(), getReactor().getDispatcher());
        createConnection(createReconnectListener(defer, reconnect));
        return defer.compose();
    }

    @Override // reactor.net.AbstractNetPeer
    protected <C> NetChannel<IN, OUT> createChannel(C c) {
        SocketChannel socketChannel = (SocketChannel) c;
        return new NettyNetChannel(getEnvironment(), getCodec(), new NettyEventLoopDispatcher(socketChannel.eventLoop(), ((Integer) getEnvironment().getProperty("reactor.tcp.connectionReactorBacklog", Integer.class, 128)).intValue()), getReactor(), socketChannel);
    }

    protected ChannelHandler[] createChannelHandlers(SocketChannel socketChannel) {
        return new ChannelHandler[]{new NettyNetChannelInboundHandler().setNetChannel((NettyNetChannel) select(socketChannel))};
    }

    private ChannelFutureListener createConnectListener(final Deferred<NetChannel<IN, OUT>, Promise<NetChannel<IN, OUT>>> deferred) {
        return new ChannelFutureListener() { // from class: reactor.net.netty.tcp.NettyTcpClient.3
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (!channelFuture.isSuccess()) {
                    if (NettyTcpClient.this.log.isErrorEnabled()) {
                        NettyTcpClient.this.log.error(channelFuture.cause().getMessage(), channelFuture.cause());
                    }
                    deferred.accept(channelFuture.cause());
                } else {
                    if (NettyTcpClient.this.log.isInfoEnabled()) {
                        NettyTcpClient.this.log.info("CONNECT: " + channelFuture.channel());
                    }
                    final NettyNetChannel nettyNetChannel = (NettyNetChannel) NettyTcpClient.this.select(channelFuture.channel());
                    channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { // from class: reactor.net.netty.tcp.NettyTcpClient.3.1
                        public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                            if (NettyTcpClient.this.log.isInfoEnabled()) {
                                NettyTcpClient.this.log.info("CLOSED: " + channelFuture2.channel());
                            }
                            NettyTcpClient.this.notifyClose(nettyNetChannel);
                            NettyTcpClient.this.getChannels().unregister(channelFuture2.channel());
                        }
                    });
                    deferred.accept(nettyNetChannel);
                }
            }
        };
    }

    private ChannelFutureListener createReconnectListener(Deferred<NetChannel<IN, OUT>, Stream<NetChannel<IN, OUT>>> deferred, Reconnect reconnect) {
        return new AnonymousClass4(reconnect, deferred);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createConnection(ChannelFutureListener channelFutureListener) {
        ChannelFuture channelFuture = (ChannelFuture) this.connectionSupplier.get();
        if (channelFuture != null) {
            channelFuture.addListener(channelFutureListener);
        }
    }
}
