package io.confluent.mqtt.protocol.netty;

import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.RegexListTopicMapper;
import io.confluent.mqtt.TopicMapper;
import io.confluent.mqtt.protocol.MqttHandler;
import io.confluent.mqtt.protocol.security.SecurityProtocol;
import io.confluent.mqtt.protocol.security.UserPasswordCallbackHandler;
import io.confluent.mqtt.stream.PublishMqttRecord;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.util.EnumMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:io/confluent/mqtt/protocol/netty/NettyMqttHandler.class */
public class NettyMqttHandler extends ChannelInboundHandlerAdapter implements MqttHandler {
    private static final Logger log = LoggerFactory.getLogger(NettyMqttHandler.class);
    public static final String LOGIN_CONTEXT_NAME = "ConfluentKafkaMqtt";
    private static NettyMqttHandler instance;
    private final SecurityProtocol securityProtocol;
    private final EnumMap<MqttMessageType, BiConsumer<Channel, MqttMessage>> mqttAutomaton = new EnumMap<>(MqttMessageType.class);
    private final TopicMapper topicMapper;

    /* renamed from: io.confluent.mqtt.protocol.netty.NettyMqttHandler$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/mqtt/protocol/netty/NettyMqttHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public static synchronized NettyMqttHandler instance(MqttConfig mqttConfig) {
        if (instance == null) {
            instance = new NettyMqttHandler(mqttConfig);
        }
        return instance;
    }

    public static synchronized void resetInstance() {
        instance = null;
    }

    protected NettyMqttHandler(MqttConfig mqttConfig) {
        this.topicMapper = new RegexListTopicMapper(mqttConfig);
        this.mqttAutomaton.put((EnumMap<MqttMessageType, BiConsumer<Channel, MqttMessage>>) MqttMessageType.CONNECT, (MqttMessageType) connectHandler());
        this.mqttAutomaton.put((EnumMap<MqttMessageType, BiConsumer<Channel, MqttMessage>>) MqttMessageType.DISCONNECT, (MqttMessageType) disconnectHandler());
        this.mqttAutomaton.put((EnumMap<MqttMessageType, BiConsumer<Channel, MqttMessage>>) MqttMessageType.PINGREQ, (MqttMessageType) pingHandler());
        this.mqttAutomaton.put((EnumMap<MqttMessageType, BiConsumer<Channel, MqttMessage>>) MqttMessageType.PUBLISH, (MqttMessageType) publishHandler());
        this.mqttAutomaton.put((EnumMap<MqttMessageType, BiConsumer<Channel, MqttMessage>>) MqttMessageType.PUBREL, (MqttMessageType) publishReleaseHandler());
        this.securityProtocol = mqttConfig.listenersSecurityProtocol();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        MqttMessage mqttMessage = (MqttMessage) obj;
        try {
            try {
                if (mqttMessage.decoderResult().isFailure()) {
                    log.debug("Decoding MQTT message from {} failed with: ", channelHandlerContext.channel(), mqttMessage.decoderResult().cause());
                    ReferenceCountUtil.release(mqttMessage);
                    return;
                }
                MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
                BiConsumer<Channel, MqttMessage> biConsumer = this.mqttAutomaton.get(messageType);
                if (biConsumer != null) {
                    biConsumer.accept(channelHandlerContext.channel(), mqttMessage);
                } else {
                    log.debug("No handler available for MQTT message '{}' from {}. Ignoring", messageType, channelHandlerContext.channel().remoteAddress());
                }
                ReferenceCountUtil.release(mqttMessage);
            } catch (Throwable th) {
                log.debug("Handling MQTT message with fixed header '{}' from {} failed with: ", new Object[]{mqttMessage.fixedHeader(), channelHandlerContext.channel(), th});
                ReferenceCountUtil.release(mqttMessage);
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.release(mqttMessage);
            throw th2;
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode mqttConnectReturnCode, boolean z) {
        return MqttMessageBuilders.connAck().returnCode(mqttConnectReturnCode).sessionPresent(z).build();
    }

    public BiConsumer<Channel, MqttMessage> connectHandler() {
        return (channel, mqttMessage) -> {
            SocketAddress remoteAddress = channel.remoteAddress();
            log.info("Handling MQTT message '{}' from {}", MqttMessageType.CONNECT, remoteAddress);
            if (login(((MqttConnectMessage) mqttMessage).payload())) {
                log.debug("Authentication successful from {}", remoteAddress);
                channel.writeAndFlush(connAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, false));
            } else {
                log.debug("Authentication failed from {}", remoteAddress);
                channel.writeAndFlush(connAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false));
                log.info("Closing connection from {}", remoteAddress);
                channel.close();
            }
        };
    }

    public BiConsumer<Channel, MqttMessage> disconnectHandler() {
        return (channel, mqttMessage) -> {
            SocketAddress remoteAddress = channel.remoteAddress();
            log.info("Handling MQTT message '{}' from {}", MqttMessageType.DISCONNECT, remoteAddress);
            if (log.isDebugEnabled()) {
                log.debug("Message metadata {} from {}", messageMetadata(mqttMessage), remoteAddress);
            }
            log.info("Closing connection from {}", remoteAddress);
            channel.flush();
            channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        };
    }

    public BiConsumer<Channel, MqttMessage> pingHandler() {
        return (channel, mqttMessage) -> {
            if (log.isDebugEnabled()) {
                log.debug("Handling MQTT message '{}' from {}", MqttMessageType.PINGREQ, channel.remoteAddress());
            }
            channel.writeAndFlush(pingResp());
        };
    }

    public BiConsumer<Channel, MqttMessage> publishHandler() {
        return (channel, mqttMessage) -> {
            MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
            log.debug("Handling MQTT message '{}' from {}", MqttMessageType.PUBLISH, channel.remoteAddress());
            MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
            ChannelFuture write = channel.write(newKafkaRecord(mqttPublishMessage));
            switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[qosLevel.ordinal()]) {
                case 1:
                    write.addListener(listener(channel, mqttPublishMessage, this::pubRec));
                    return;
                case 2:
                    write.addListener(listener(channel, mqttPublishMessage, this::pubAck));
                    return;
                case MqttConfig.CONFLUENT_TOPIC_REPLICATION_FACTOR_DEFAULT /* 3 */:
                    return;
                default:
                    log.warn("'{}' message has invalid QoS level '{}'. Ignoring", MqttMessageType.PUBLISH, qosLevel);
                    return;
            }
        };
    }

    public BiConsumer<Channel, MqttMessage> publishReleaseHandler() {
        return (channel, mqttMessage) -> {
            if (log.isDebugEnabled()) {
                log.debug("Handling MQTT message '{}' from {}", MqttMessageType.PUBREL, channel.remoteAddress());
            }
            channel.writeAndFlush(pubComp(messageId(mqttMessage)));
        };
    }

    private MqttMessage pingResp() {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        log.debug("Responding to client with {}", MqttMessageType.PINGREQ);
        return new MqttMessage(mqttFixedHeader);
    }

    private MqttPubAckMessage pubAck(Integer num) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
        MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(num.intValue());
        log.debug("Responding to client with {}", MqttMessageType.PUBACK);
        return MqttMessageFactory.newMessage(mqttFixedHeader, from, (Object) null);
    }

    private MqttMessage pubRec(Integer num) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 2);
        MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(num.intValue());
        log.debug("Responding to client with {}", MqttMessageType.PUBREC);
        return MqttMessageFactory.newMessage(mqttFixedHeader, from, (Object) null);
    }

    private MqttMessage pubComp(Integer num) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 2);
        MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(num.intValue());
        log.debug("Responding to client with '{}'", MqttMessageType.PUBCOMP);
        return MqttMessageFactory.newMessage(mqttFixedHeader, from, (Object) null);
    }

    public GenericFutureListener<? extends Future<? super Void>> listener(Channel channel, MqttPublishMessage mqttPublishMessage, Function<Integer, MqttMessage> function) {
        return future -> {
            if (future.isSuccess()) {
                if (log.isDebugEnabled()) {
                    log.debug("Publishing to Kafka succeeded for message with {} from {}", messageMetadata(mqttPublishMessage), channel.remoteAddress());
                }
                channel.writeAndFlush(function.apply(messageId(mqttPublishMessage)));
            } else if (future.isCancelled()) {
                log.warn("Publishing to Kafka was canceled for message with {} from {}", messageMetadata(mqttPublishMessage), channel.remoteAddress());
            } else {
                log.warn("Publishing to Kafka failed for message with {} from {} due to {}", new Object[]{messageMetadata(mqttPublishMessage), channel.remoteAddress(), future.cause()});
            }
        };
    }

    private PublishMqttRecord newKafkaRecord(MqttPublishMessage mqttPublishMessage) {
        String str = mqttPublishMessage.variableHeader().topicName();
        TopicPartition orElseThrow = this.topicMapper.map(str).orElseThrow(() -> {
            return new DataException("No matching Kafka topic was found for " + str);
        });
        log.trace("MQTT topic '{}' mapped to Kafka topic '{}'", str, orElseThrow);
        return PublishMqttRecord.newRecord(orElseThrow, mqttPublishMessage);
    }

    private Integer messageId(MqttMessage mqttMessage) {
        return Integer.valueOf(mqttMessage instanceof MqttPublishMessage ? ((MqttPublishMessage) mqttMessage).variableHeader().packetId() : ((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
    }

    private static String messageMetadata(MqttMessage mqttMessage) {
        return "[fixedHeader=" + (mqttMessage.fixedHeader() != null ? mqttMessage.fixedHeader().toString() : MqttConfig.CONFLUENT_LICENSE_DEFAULT) + ", variableHeader=" + (mqttMessage.variableHeader() != null ? mqttMessage.variableHeader().toString() : MqttConfig.CONFLUENT_LICENSE_DEFAULT) + "]";
    }

    private boolean login(MqttConnectPayload mqttConnectPayload) {
        try {
            if (!SecurityProtocol.SASL_TLS.equals(this.securityProtocol) && !SecurityProtocol.SASL_SSL.equals(this.securityProtocol) && !SecurityProtocol.SASL_PLAINTEXT.equals(this.securityProtocol)) {
                return true;
            }
            new LoginContext(LOGIN_CONTEXT_NAME, new UserPasswordCallbackHandler(mqttConnectPayload)).login();
            return true;
        } catch (LoginException e) {
            log.warn("Authentication failed for user '{}'", mqttConnectPayload.userName(), e);
            return false;
        } catch (Throwable th) {
            log.warn("Unable to login: ", th);
            return false;
        }
    }
}
