package reactor.net.zmq.tcp;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.config.ServerSocketOptions;
import reactor.net.config.SslOptions;
import reactor.net.tcp.TcpServer;
import reactor.net.zmq.ZeroMQNetChannel;
import reactor.net.zmq.ZeroMQServerSocketOptions;
import reactor.net.zmq.ZeroMQWorker;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.support.NamedDaemonThreadFactory;
import reactor.util.Assert;
import reactor.util.UUIDUtils;

/* loaded from: input_file:reactor/net/zmq/tcp/ZeroMQTcpServer.class */
public class ZeroMQTcpServer<IN, OUT> extends TcpServer<IN, OUT> {
    private final Logger log;
    private final int ioThreadCount;
    private final ZeroMQServerSocketOptions zmqOpts;
    private final ExecutorService threadPool;
    private volatile ZeroMQWorker<IN, OUT> worker;
    private volatile Future<?> workerFuture;

    public ZeroMQTcpServer(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nullable InetSocketAddress inetSocketAddress, ServerSocketOptions serverSocketOptions, SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> collection) {
        super(environment, reactor2, inetSocketAddress, serverSocketOptions, sslOptions, codec, collection);
        this.log = LoggerFactory.getLogger(getClass());
        this.ioThreadCount = ((Integer) environment.getProperty("reactor.zmq.ioThreadCount", Integer.class, 1)).intValue();
        if (serverSocketOptions instanceof ZeroMQServerSocketOptions) {
            this.zmqOpts = (ZeroMQServerSocketOptions) serverSocketOptions;
        } else {
            this.zmqOpts = null;
        }
        this.threadPool = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("zmq-server"));
    }

    @Override // reactor.net.tcp.TcpServer, reactor.net.NetServer
    public TcpServer<IN, OUT> start(@Nullable final Runnable runnable) {
        Assert.isNull(this.worker, "This ZeroMQ server has already been started");
        this.worker = new ZeroMQWorker<IN, OUT>(UUIDUtils.random(), null != this.zmqOpts ? this.zmqOpts.socketType() : 6, this.ioThreadCount, null != this.zmqOpts ? this.zmqOpts.context() : null) { // from class: reactor.net.zmq.tcp.ZeroMQTcpServer.1
            @Override // reactor.net.zmq.ZeroMQWorker
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize(ZeroMQTcpServer.this.getOptions().rcvbuf());
                socket.setSendBufferSize(ZeroMQTcpServer.this.getOptions().sndbuf());
                socket.setBacklog(ZeroMQTcpServer.this.getOptions().backlog());
                if (ZeroMQTcpServer.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null == ZeroMQTcpServer.this.zmqOpts || null == ZeroMQTcpServer.this.zmqOpts.socketConfigurer()) {
                    return;
                }
                ZeroMQTcpServer.this.zmqOpts.socketConfigurer().accept(socket);
            }

            @Override // reactor.net.zmq.ZeroMQWorker
            protected void start(ZMQ.Socket socket) {
                String listenAddresses = (null == ZeroMQTcpServer.this.zmqOpts || null == ZeroMQTcpServer.this.zmqOpts.listenAddresses()) ? "tcp://" + ZeroMQTcpServer.this.getListenAddress().getHostString() + ":" + ZeroMQTcpServer.this.getListenAddress().getPort() : ZeroMQTcpServer.this.zmqOpts.listenAddresses();
                if (ZeroMQTcpServer.this.log.isInfoEnabled()) {
                    ZeroMQTcpServer.this.log.info("BIND: starting ZeroMQ {} socket on {}", ZeroMQ.findSocketTypeName(socket.getType()), listenAddresses);
                }
                socket.bind(listenAddresses);
                ZeroMQTcpServer.this.notifyStart(runnable);
            }

            @Override // reactor.net.zmq.ZeroMQWorker
            protected ZeroMQNetChannel<IN, OUT> select(Object obj) {
                return (ZeroMQNetChannel) ZeroMQTcpServer.this.select(obj);
            }
        };
        this.workerFuture = this.threadPool.submit(this.worker);
        return this;
    }

    @Override // reactor.net.AbstractNetPeer
    protected <C> NetChannel<IN, OUT> createChannel(C c) {
        return new ZeroMQNetChannel(getEnvironment(), getReactor(), getReactor().getDispatcher(), getCodec());
    }

    @Override // reactor.net.NetServer
    public Promise<Boolean> shutdown() {
        if (null == this.worker) {
            return Promises.error(new IllegalStateException("This ZeroMQ server has not been started"));
        }
        Promise<Boolean> defer = Promises.defer(getEnvironment(), getReactor().getDispatcher());
        super.close((Consumer<Boolean>) null);
        this.worker.shutdown();
        if (!this.workerFuture.isDone()) {
            this.workerFuture.cancel(true);
        }
        this.threadPool.shutdownNow();
        notifyShutdown();
        defer.accept(true);
        return defer;
    }
}
