package cn.wizzer.iot.mqtt.server.broker.protocol;

import cn.hutool.core.util.StrUtil;
import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.common.auth.IAuthService;
import cn.wizzer.iot.mqtt.server.common.message.IDupPubRelMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.message.IDupPublishMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.session.ISessionStoreService;
import cn.wizzer.iot.mqtt.server.common.session.SessionStore;
import cn.wizzer.iot.mqtt.server.common.subscribe.ISubscribeStoreService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/wizzer/iot/mqtt/server/broker/protocol/Connect.class */
public class Connect {
    private static final Logger LOGGER = LoggerFactory.getLogger(Connect.class);
    private ISessionStoreService sessionStoreService;
    private ISubscribeStoreService subscribeStoreService;
    private IDupPublishMessageStoreService dupPublishMessageStoreService;
    private IDupPubRelMessageStoreService dupPubRelMessageStoreService;
    private IAuthService authService;
    private BrokerProperties brokerProperties;
    private ChannelGroup channelGroup;
    private Map<String, ChannelId> channelIdMap;

    public Connect(ISessionStoreService iSessionStoreService, ISubscribeStoreService iSubscribeStoreService, IDupPublishMessageStoreService iDupPublishMessageStoreService, IDupPubRelMessageStoreService iDupPubRelMessageStoreService, IAuthService iAuthService, BrokerProperties brokerProperties, ChannelGroup channelGroup, Map<String, ChannelId> map) {
        this.sessionStoreService = iSessionStoreService;
        this.subscribeStoreService = iSubscribeStoreService;
        this.dupPublishMessageStoreService = iDupPublishMessageStoreService;
        this.dupPubRelMessageStoreService = iDupPubRelMessageStoreService;
        this.authService = iAuthService;
        this.brokerProperties = brokerProperties;
        this.channelGroup = channelGroup;
        this.channelIdMap = map;
    }

    public void processConnect(Channel channel, MqttConnectMessage mqttConnectMessage) {
        Channel find;
        if (mqttConnectMessage.decoderResult().isFailure()) {
            Throwable cause = mqttConnectMessage.decoderResult().cause();
            if (cause instanceof MqttUnacceptableProtocolVersionException) {
                channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), (Object) null));
                channel.close();
                return;
            } else if (!(cause instanceof MqttIdentifierRejectedException)) {
                channel.close();
                return;
            } else {
                channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), (Object) null));
                channel.close();
                return;
            }
        }
        if (StrUtil.isBlank(mqttConnectMessage.payload().clientIdentifier())) {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), (Object) null));
            channel.close();
            return;
        }
        if (this.brokerProperties.getMqttPasswordMust()) {
            if (!this.authService.checkValid(mqttConnectMessage.payload().userName(), mqttConnectMessage.payload().passwordInBytes() == null ? null : new String(mqttConnectMessage.payload().passwordInBytes(), CharsetUtil.UTF_8))) {
                channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), (Object) null));
                channel.close();
                return;
            }
        }
        if (this.sessionStoreService.containsKey(mqttConnectMessage.payload().clientIdentifier())) {
            SessionStore sessionStore = this.sessionStoreService.get(mqttConnectMessage.payload().clientIdentifier());
            if (Boolean.valueOf(sessionStore.isCleanSession()).booleanValue()) {
                this.sessionStoreService.remove(mqttConnectMessage.payload().clientIdentifier());
                this.subscribeStoreService.removeForClient(mqttConnectMessage.payload().clientIdentifier());
                this.dupPublishMessageStoreService.removeByClient(mqttConnectMessage.payload().clientIdentifier());
                this.dupPubRelMessageStoreService.removeByClient(mqttConnectMessage.payload().clientIdentifier());
            }
            try {
                ChannelId channelId = this.channelIdMap.get(sessionStore.getChannelId());
                if (channelId != null && (find = this.channelGroup.find(channelId)) != null) {
                    find.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        SessionStore sessionStore2 = new SessionStore(mqttConnectMessage.payload().clientIdentifier(), channel.id().asShortText(), mqttConnectMessage.variableHeader().isCleanSession(), (MqttPublishMessage) null);
        if (mqttConnectMessage.variableHeader().isWillFlag()) {
            sessionStore2.setWillMessage(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos()), mqttConnectMessage.variableHeader().isWillRetain(), 0), new MqttPublishVariableHeader(mqttConnectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(mqttConnectMessage.payload().willMessageInBytes())));
        }
        if (mqttConnectMessage.variableHeader().keepAliveTimeSeconds() > 0) {
            if (channel.pipeline().names().contains("idle")) {
                channel.pipeline().remove("idle");
            }
            channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, Math.round(mqttConnectMessage.variableHeader().keepAliveTimeSeconds() * 1.5f)));
        }
        this.sessionStoreService.put(mqttConnectMessage.payload().clientIdentifier(), sessionStore2);
        channel.attr(AttributeKey.valueOf("clientId")).set(mqttConnectMessage.payload().clientIdentifier());
        channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, Boolean.valueOf(this.sessionStoreService.containsKey(mqttConnectMessage.payload().clientIdentifier()) && !mqttConnectMessage.variableHeader().isCleanSession()).booleanValue()), (Object) null));
        LOGGER.debug("CONNECT - clientId: {}, cleanSession: {}", mqttConnectMessage.payload().clientIdentifier(), Boolean.valueOf(mqttConnectMessage.variableHeader().isCleanSession()));
        if (mqttConnectMessage.variableHeader().isCleanSession()) {
            return;
        }
        List list = this.dupPublishMessageStoreService.get(mqttConnectMessage.payload().clientIdentifier());
        List list2 = this.dupPubRelMessageStoreService.get(mqttConnectMessage.payload().clientIdentifier());
        list.forEach(dupPublishMessageStore -> {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()), false, 0), new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())));
        });
        list2.forEach(dupPubRelMessageStore -> {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), (Object) null));
        });
    }
}
