package org.springframework.messaging.tcp.reactor;

import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.Environment;
import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration;
import reactor.core.config.ReactorConfiguration;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Reconnect;
import reactor.io.net.Spec;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.io.net.tcp.TcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Streams;
import reactor.rx.action.Signal;

/* loaded from: input_file:lib/spring-messaging-4.2.1.RELEASE.jar:org/springframework/messaging/tcp/reactor/Reactor2TcpClient.class */
public class Reactor2TcpClient<P> implements TcpOperations<P> {
    public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
    private final NioEventLoopGroup eventLoopGroup;
    private final NetStreams.TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
    private final List<TcpClient<Message<P>, Message<P>>> tcpClients;
    private boolean stopping;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-messaging-4.2.1.RELEASE.jar:org/springframework/messaging/tcp/reactor/Reactor2TcpClient$MessageChannelStreamHandler.class */
    public static class MessageChannelStreamHandler<P> implements ReactorChannelHandler<Message<P>, Message<P>, ChannelStream<Message<P>, Message<P>>> {
        private final TcpConnectionHandler<P> connectionHandler;

        public MessageChannelStreamHandler(TcpConnectionHandler<P> tcpConnectionHandler) {
            this.connectionHandler = tcpConnectionHandler;
        }

        public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
            Promise prepare = Promises.prepare();
            this.connectionHandler.afterConnected(new Reactor2TcpConnection(channelStream, prepare));
            channelStream.finallyDo(new Consumer<Signal<Message<P>>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.MessageChannelStreamHandler.2
                public void accept(Signal<Message<P>> signal) {
                    if (signal.isOnError()) {
                        MessageChannelStreamHandler.this.connectionHandler.handleFailure(signal.getThrowable());
                    } else if (signal.isOnComplete()) {
                        MessageChannelStreamHandler.this.connectionHandler.afterConnectionClosed();
                    }
                }
            }).consume(new Consumer<Message<P>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.MessageChannelStreamHandler.1
                public void accept(Message<P> message) {
                    MessageChannelStreamHandler.this.connectionHandler.handleMessage(message);
                }
            });
            return prepare;
        }
    }

    /* loaded from: input_file:lib/spring-messaging-4.2.1.RELEASE.jar:org/springframework/messaging/tcp/reactor/Reactor2TcpClient$ReactorReconnectAdapter.class */
    private static class ReactorReconnectAdapter implements Reconnect {
        private final ReconnectStrategy strategy;

        public ReactorReconnectAdapter(ReconnectStrategy reconnectStrategy) {
            this.strategy = reconnectStrategy;
        }

        public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress inetSocketAddress, int i) {
            return Tuple.of(inetSocketAddress, this.strategy.getTimeToNextAttempt(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-messaging-4.2.1.RELEASE.jar:org/springframework/messaging/tcp/reactor/Reactor2TcpClient$SynchronousDispatcherConfigReader.class */
    public static class SynchronousDispatcherConfigReader implements ConfigurationReader {
        private SynchronousDispatcherConfigReader() {
        }

        public ReactorConfiguration read() {
            return new ReactorConfiguration(Arrays.asList(new DispatcherConfiguration[0]), "sync", new Properties());
        }
    }

    public Reactor2TcpClient(final String str, final int i, final Codec<Buffer, Message<P>, Message<P>> codec) {
        this.tcpClients = new ArrayList();
        this.eventLoopGroup = initEventLoopGroup();
        this.tcpClientSpecFactory = new NetStreams.TcpClientFactory<Message<P>, Message<P>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.1
            public Spec.TcpClientSpec<Message<P>, Message<P>> apply(Spec.TcpClientSpec<Message<P>, Message<P>> tcpClientSpec) {
                return tcpClientSpec.env(new Environment(new SynchronousDispatcherConfigReader())).codec(codec).connect(str, i).options(new NettyClientSocketOptions().eventLoopGroup(Reactor2TcpClient.this.eventLoopGroup));
            }
        };
    }

    public Reactor2TcpClient(NetStreams.TcpClientFactory<Message<P>, Message<P>> tcpClientFactory) {
        this.tcpClients = new ArrayList();
        Assert.notNull(tcpClientFactory, "'tcpClientClientFactory' must not be null");
        this.tcpClientSpecFactory = tcpClientFactory;
        this.eventLoopGroup = null;
    }

    private static NioEventLoopGroup initEventLoopGroup() {
        int i;
        try {
            i = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
        } catch (Exception e) {
            i = -1;
        }
        if (i <= 0) {
            i = Runtime.getRuntime().availableProcessors();
        }
        return new NioEventLoopGroup(i, new NamedDaemonThreadFactory("reactor-tcp-io"));
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(final TcpConnectionHandler<P> tcpConnectionHandler) {
        Assert.notNull(tcpConnectionHandler, "TcpConnectionHandler must not be null");
        synchronized (this.tcpClients) {
            if (!this.stopping) {
                TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
                this.tcpClients.add(tcpClient);
                return new PassThroughPromiseToListenableFutureAdapter(tcpClient.start(new MessageChannelStreamHandler(tcpConnectionHandler)).onError(new Consumer<Throwable>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.2
                    public void accept(Throwable th) {
                        tcpConnectionHandler.afterConnectFailure(th);
                    }
                }));
            }
            IllegalStateException illegalStateException = new IllegalStateException("Shutting down.");
            tcpConnectionHandler.afterConnectFailure(illegalStateException);
            return new PassThroughPromiseToListenableFutureAdapter(Promises.error(illegalStateException));
        }
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> tcpConnectionHandler, ReconnectStrategy reconnectStrategy) {
        Assert.notNull(tcpConnectionHandler, "TcpConnectionHandler must not be null");
        Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null");
        synchronized (this.tcpClients) {
            if (!this.stopping) {
                TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
                this.tcpClients.add(tcpClient);
                return new PassThroughPromiseToListenableFutureAdapter(tcpClient.start(new MessageChannelStreamHandler(tcpConnectionHandler), new ReactorReconnectAdapter(reconnectStrategy)).next().after());
            }
            IllegalStateException illegalStateException = new IllegalStateException("Shutting down.");
            tcpConnectionHandler.afterConnectFailure(illegalStateException);
            return new PassThroughPromiseToListenableFutureAdapter(Promises.error(illegalStateException));
        }
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> shutdown() {
        synchronized (this.tcpClients) {
            this.stopping = true;
        }
        Promise next = Streams.from(this.tcpClients).flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Void>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.3
            public Promise<Void> apply(final TcpClient<Message<P>, Message<P>> tcpClient) {
                return tcpClient.shutdown().onComplete(new Consumer<Promise<Void>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.3.1
                    public void accept(Promise<Void> promise) {
                        Reactor2TcpClient.this.tcpClients.remove(tcpClient);
                    }
                });
            }
        }).next();
        if (this.eventLoopGroup != null) {
            final Promise prepare = Promises.prepare();
            next.onComplete(new Consumer<Promise<Void>>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.4
                public void accept(Promise<Void> promise) {
                    Reactor2TcpClient.this.eventLoopGroup.shutdownGracefully().addListener(new FutureListener<Object>() { // from class: org.springframework.messaging.tcp.reactor.Reactor2TcpClient.4.1
                        public void operationComplete(Future<Object> future) throws Exception {
                            if (future.isSuccess()) {
                                prepare.onComplete();
                            } else {
                                prepare.onError(future.cause());
                            }
                        }
                    });
                }
            });
            next = prepare;
        }
        return new PassThroughPromiseToListenableFutureAdapter(next);
    }
}
