package io.confluent.mqtt.network.netty;

import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.PipelineFactory;
import io.confluent.mqtt.ProxyFactory;
import io.confluent.mqtt.ProxyServer;
import io.confluent.mqtt.network.NetworkConfig;
import io.confluent.mqtt.stream.StreamConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;

/* loaded from: input_file:io/confluent/mqtt/network/netty/NettyProxyFactory.class */
public class NettyProxyFactory implements ProxyFactory {
    private final MqttConfig config;

    public NettyProxyFactory(MqttConfig mqttConfig) {
        this.config = mqttConfig;
    }

    @Override // io.confluent.mqtt.ProxyFactory
    public ProxyServer newProxy(final PipelineFactory pipelineFactory) {
        MultithreadEventLoopGroup newEventLoopGroup = newEventLoopGroup(this.config.networkThreads(), NetworkConfig.NETWORK_THREADS_PREFIX, this.config.epollEnabled());
        final MultithreadEventLoopGroup newEventLoopGroup2 = newEventLoopGroup(this.config.streamThreads(), StreamConfig.KAFKA_PRODUCER_GROUPS_PREFIX, this.config.epollEnabled());
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(newEventLoopGroup).channel(channelClass(this.config.epollEnabled())).localAddress(this.config.host(), this.config.port()).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<SocketChannel>() { // from class: io.confluent.mqtt.network.netty.NettyProxyFactory.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(pipelineFactory.newPublishPipelineHandlers(socketChannel)).addLast(newEventLoopGroup2, new ChannelHandler[]{pipelineFactory.newKafkaPublishHandler(socketChannel)});
            }
        });
        return new NettyProxyServer(serverBootstrap);
    }

    private MultithreadEventLoopGroup newEventLoopGroup(int i, String str, boolean z) {
        DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory(str, true);
        return z ? new EpollEventLoopGroup(i, defaultThreadFactory) : new NioEventLoopGroup(i, defaultThreadFactory);
    }

    private Class<? extends ServerChannel> channelClass(boolean z) {
        return z ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
    }
}
