package reactor.net.netty.udp;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.util.Collection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.spec.Reactors;
import reactor.event.dispatch.SynchronousDispatcher;
import reactor.function.Consumer;
import reactor.function.batch.BatchConsumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.config.ServerSocketOptions;
import reactor.net.netty.NettyNetChannel;
import reactor.net.netty.NettyNetChannelInboundHandler;
import reactor.net.netty.NettyServerSocketOptions;
import reactor.net.udp.DatagramServer;
import reactor.support.NamedDaemonThreadFactory;

/* loaded from: input_file:reactor/net/netty/udp/NettyDatagramServer.class */
public class NettyDatagramServer<IN, OUT> extends DatagramServer<IN, OUT> {
    private final Logger log;
    private final Bootstrap bootstrap;
    private final EventLoopGroup ioGroup;
    private volatile NioDatagramChannel channel;
    private volatile NettyNetChannel<IN, OUT> netChannel;

    /* loaded from: input_file:reactor/net/netty/udp/NettyDatagramServer$PromiseCompletingListener.class */
    private static class PromiseCompletingListener implements ChannelFutureListener {
        private final Deferred<Void, Promise<Void>> d;

        private PromiseCompletingListener(Deferred<Void, Promise<Void>> deferred) {
            this.d = deferred;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                this.d.accept((Void) null);
            } else {
                this.d.accept(channelFuture.cause());
            }
        }
    }

    public NettyDatagramServer(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nullable InetSocketAddress inetSocketAddress, @Nullable final NetworkInterface networkInterface, @Nonnull final ServerSocketOptions serverSocketOptions, @Nullable Codec<Buffer, IN, OUT> codec, Collection<Consumer<NetChannel<IN, OUT>>> collection) {
        super(environment, reactor2, inetSocketAddress, networkInterface, serverSocketOptions, codec, collection);
        this.log = LoggerFactory.getLogger(getClass());
        this.ioGroup = new NioEventLoopGroup(((Integer) environment.getProperty("reactor.udp.ioThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS))).intValue(), new NamedDaemonThreadFactory("reactor-udp-io"));
        final NettyNetChannelInboundHandler nettyNetChannelInboundHandler = new NettyNetChannelInboundHandler() { // from class: reactor.net.netty.udp.NettyDatagramServer.1
            @Override // reactor.net.netty.NettyNetChannelInboundHandler
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                super.channelRead(channelHandlerContext, ((DatagramPacket) obj).content());
            }
        };
        this.bootstrap = new Bootstrap().group(this.ioGroup).option(ChannelOption.SO_RCVBUF, Integer.valueOf(serverSocketOptions.rcvbuf())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(serverSocketOptions.sndbuf())).option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(serverSocketOptions.reuseAddr())).channelFactory(new ChannelFactory<Channel>() { // from class: reactor.net.netty.udp.NettyDatagramServer.2
            public Channel newChannel() {
                final NioDatagramChannel nioDatagramChannel = new NioDatagramChannel();
                DatagramChannelConfig config = nioDatagramChannel.config();
                config.setReceiveBufferSize(serverSocketOptions.rcvbuf());
                config.setSendBufferSize(serverSocketOptions.sndbuf());
                config.setReuseAddress(serverSocketOptions.reuseAddr());
                if (null != networkInterface) {
                    config.setNetworkInterface(networkInterface);
                }
                if ((serverSocketOptions instanceof NettyServerSocketOptions) && null != ((NettyServerSocketOptions) serverSocketOptions).pipelineConfigurer()) {
                    ((NettyServerSocketOptions) serverSocketOptions).pipelineConfigurer().accept(nioDatagramChannel.pipeline());
                }
                nioDatagramChannel.closeFuture().addListener(new ChannelFutureListener() { // from class: reactor.net.netty.udp.NettyDatagramServer.2.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (NettyDatagramServer.this.log.isInfoEnabled()) {
                            NettyDatagramServer.this.log.info("CLOSE {}", nioDatagramChannel);
                        }
                        NettyDatagramServer.this.close((NettyDatagramServer) nioDatagramChannel);
                    }
                });
                NettyDatagramServer.this.netChannel = (NettyNetChannel) NettyDatagramServer.this.select(nioDatagramChannel);
                nettyNetChannelInboundHandler.setNetChannel(NettyDatagramServer.this.netChannel);
                nioDatagramChannel.pipeline().addLast(new ChannelHandler[]{new ChannelOutboundHandlerAdapter() { // from class: reactor.net.netty.udp.NettyDatagramServer.2.2
                    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
                        super.write(channelHandlerContext, obj, channelPromise);
                    }
                }});
                return nioDatagramChannel;
            }
        }).handler(nettyNetChannelInboundHandler);
        if (null != inetSocketAddress) {
            this.bootstrap.localAddress(inetSocketAddress);
        } else {
            this.bootstrap.localAddress(NetUtil.LOCALHOST, 3000);
        }
        if (null != networkInterface) {
            this.bootstrap.option(ChannelOption.IP_MULTICAST_IF, networkInterface);
        }
    }

    @Override // reactor.net.udp.DatagramServer, reactor.net.NetServer
    public DatagramServer<IN, OUT> start(@Nullable final Runnable runnable) {
        ChannelFuture bind = this.bootstrap.bind();
        if (null != runnable) {
            bind.addListener(new ChannelFutureListener() { // from class: reactor.net.netty.udp.NettyDatagramServer.3
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        NettyDatagramServer.this.log.info("BIND {}", channelFuture.channel().localAddress());
                        NettyDatagramServer.this.notifyStart(runnable);
                        NettyDatagramServer.this.channel = channelFuture.channel();
                    }
                }
            });
        }
        return this;
    }

    @Override // reactor.net.NetServer
    public Promise<Void> shutdown() {
        final Deferred defer = Promises.defer(getEnvironment(), getReactor().getDispatcher());
        Reactors.schedule(new Consumer<Void>() { // from class: reactor.net.netty.udp.NettyDatagramServer.4
            public void accept(Void r5) {
                NettyDatagramServer.this.ioGroup.shutdownGracefully().addListener(new GenericFutureListener() { // from class: reactor.net.netty.udp.NettyDatagramServer.4.1
                    public void operationComplete(Future future) throws Exception {
                        if (future.isSuccess()) {
                            defer.accept((Void) null);
                        } else {
                            defer.accept(future.cause());
                        }
                    }
                });
            }
        }, (Object) null, getReactor());
        notifyShutdown();
        return defer.compose();
    }

    @Override // reactor.net.udp.DatagramServer
    public DatagramServer<IN, OUT> send(OUT out) {
        if (null == this.channel) {
            throw new IllegalStateException("DatagramServer not running.");
        }
        this.netChannel.send((NettyNetChannel<IN, OUT>) out);
        return this;
    }

    @Override // reactor.net.udp.DatagramServer
    public Stream<IN> in() {
        return this.netChannel.in();
    }

    @Override // reactor.net.udp.DatagramServer
    public BatchConsumer<OUT> out() {
        return this.netChannel.out();
    }

    @Override // reactor.net.udp.DatagramServer
    public Promise<Void> join(InetAddress inetAddress, NetworkInterface networkInterface) {
        if (null == this.channel) {
            throw new IllegalStateException("DatagramServer not running.");
        }
        Deferred defer = Promises.defer(getEnvironment(), getReactor().getDispatcher());
        if (null == networkInterface && null != getMulticastInterface()) {
            networkInterface = getMulticastInterface();
        }
        (null != networkInterface ? this.channel.joinGroup(new InetSocketAddress(inetAddress, getListenAddress().getPort()), networkInterface) : this.channel.joinGroup(inetAddress)).addListener(new PromiseCompletingListener(defer));
        return defer.compose();
    }

    @Override // reactor.net.udp.DatagramServer
    public Promise<Void> leave(InetAddress inetAddress, NetworkInterface networkInterface) {
        if (null == this.channel) {
            throw new IllegalStateException("DatagramServer not running.");
        }
        if (null == networkInterface && null != getMulticastInterface()) {
            networkInterface = getMulticastInterface();
        }
        Deferred defer = Promises.defer(getEnvironment(), getReactor().getDispatcher());
        (null != networkInterface ? this.channel.leaveGroup(new InetSocketAddress(inetAddress, getListenAddress().getPort()), networkInterface) : this.channel.leaveGroup(inetAddress)).addListener(new PromiseCompletingListener(defer));
        return defer.compose();
    }

    @Override // reactor.net.AbstractNetPeer
    protected <C> NetChannel<IN, OUT> createChannel(C c) {
        return new NettyNetChannel(getEnvironment(), getCodec(), new SynchronousDispatcher(), getReactor(), (NioDatagramChannel) c);
    }

    @Override // reactor.net.AbstractNetPeer
    protected void doClose(@Nullable final Consumer<Void> consumer) {
        ChannelFuture close = this.channel.close();
        if (null != consumer) {
            close.addListener(new ChannelFutureListener() { // from class: reactor.net.netty.udp.NettyDatagramServer.5
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        Reactors.schedule(consumer, (Object) null, NettyDatagramServer.this.getReactor());
                    }
                }
            });
        }
    }
}
