package cn.featherfly.common.mqtt;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

/* loaded from: input_file:cn/featherfly/common/mqtt/EasyMqttClientImpl.class */
public class EasyMqttClientImpl extends ReconnectableClient<EasyMqttClientImpl> {
    private Map<String, Consumers> topicConsumers = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cn/featherfly/common/mqtt/EasyMqttClientImpl$Consumers.class */
    public class Consumers {
        private List<Consumer<MqttMessage>> consumers;
        private List<BiConsumer<String, MqttMessage>> biConsumers;

        private Consumers() {
            this.consumers = new ArrayList(0);
            this.biConsumers = new ArrayList(0);
        }
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient connect() throws MqttException {
        return isConnected() ? this : connect(new AutoDetectionMqttCallBack(this), null);
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient subscribe(String str, Qos qos, Consumer<MqttMessage> consumer) throws MqttException {
        this.logger.debug("subscribe topicFilter -> {}, qos -> {}", str, qos);
        Consumers consumers = getConsumers(str);
        if (consumers == null) {
            consumers = new Consumers();
            this.topicConsumers.put(str, consumers);
            this.client.subscribe(str, qos.ordinal(), (str2, mqttMessage) -> {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("receive topic -> {}, msgId -> {}, qos -> {}, payload -> {}", new Object[]{str2, Integer.valueOf(mqttMessage.getId()), Integer.valueOf(mqttMessage.getQos()), new String(mqttMessage.getPayload(), this.charset)});
                }
                Consumers consumers2 = getConsumers(str);
                if (consumers2 != null) {
                    Iterator it = consumers2.consumers.iterator();
                    while (it.hasNext()) {
                        ((Consumer) it.next()).accept(mqttMessage);
                    }
                }
            });
        }
        consumers.consumers.add(consumer);
        return this;
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient subscribe(String str, Qos qos, BiConsumer<String, MqttMessage> biConsumer) throws MqttException {
        this.logger.debug("subscribe topicFilter -> {}, qos -> {}", str, qos);
        Consumers consumers = getConsumers(str);
        if (consumers == null) {
            consumers = new Consumers();
            this.topicConsumers.put(str, consumers);
            this.client.subscribe(str, qos.ordinal(), (str2, mqttMessage) -> {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("receive topic -> {}, msgId -> {}, qos -> {}, payload -> {}", new Object[]{str2, Integer.valueOf(mqttMessage.getId()), Integer.valueOf(mqttMessage.getQos()), new String(mqttMessage.getPayload(), this.charset)});
                }
                Consumers consumers2 = getConsumers(str);
                if (consumers2 != null) {
                    Iterator it = consumers2.biConsumers.iterator();
                    while (it.hasNext()) {
                        ((BiConsumer) it.next()).accept(str, mqttMessage);
                    }
                }
            });
        }
        consumers.biConsumers.add(biConsumer);
        return this;
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient unsubscribe(String str) throws MqttException {
        this.topicConsumers.remove(str);
        this.client.unsubscribe(str);
        return this;
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient unsubscribeAll() throws MqttException {
        Iterator<String> it = this.topicConsumers.keySet().iterator();
        while (it.hasNext()) {
            this.client.unsubscribe(it.next());
        }
        this.topicConsumers.clear();
        return this;
    }

    private void resubscribeAll() throws MqttException {
        for (Map.Entry<String, Consumers> entry : this.topicConsumers.entrySet()) {
            String key = entry.getKey();
            ArrayList arrayList = new ArrayList(entry.getValue().consumers);
            ArrayList arrayList2 = new ArrayList(entry.getValue().biConsumers);
            unsubscribe(key);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                subscribe(key, Qos.ONLY_ONCE, (Consumer<MqttMessage>) it.next());
            }
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                subscribe(key, Qos.ONLY_ONCE, (BiConsumer<String, MqttMessage>) it2.next());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // cn.featherfly.common.mqtt.ReconnectableClient
    public void connected() throws MqttException {
        if (this.connectedConsumer == null) {
            resubscribeAll();
        } else {
            unsubscribeAll();
        }
        super.connected();
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient publish(String str, String str2) throws MqttPersistenceException, MqttException {
        return publish(str, str2, Qos.ONLY_ONCE);
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient publish(String str, String str2, Consumer<IMqttDeliveryToken> consumer) throws MqttPersistenceException, MqttException {
        return publish(str, str2, Qos.ONLY_ONCE, consumer);
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient publish(String str, String str2, Qos qos) throws MqttException {
        return publish(str, str2, qos, null);
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient publish(String str, String str2, Qos qos, Consumer<IMqttDeliveryToken> consumer) throws MqttPersistenceException, MqttException {
        return publish(str, str2, qos, this.charset, consumer);
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient publish(String str, String str2, Qos qos, boolean z, Consumer<IMqttDeliveryToken> consumer) throws MqttPersistenceException, MqttException {
        return publish(str, str2, qos, this.charset, z, consumer);
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient publish(String str, String str2, Qos qos, Charset charset, Consumer<IMqttDeliveryToken> consumer) throws MqttPersistenceException, MqttException {
        return publish(str, str2, qos, charset, false, consumer);
    }

    @Override // cn.featherfly.common.mqtt.EasyMqttClient
    public EasyMqttClient publish(String str, String str2, Qos qos, Charset charset, boolean z, Consumer<IMqttDeliveryToken> consumer) throws MqttPersistenceException, MqttException {
        if (consumer != null) {
            ((AutoDetectionMqttCallBack) this.callback).publish(str, consumer);
        }
        this.logger.debug("publish topic -> {}, qos -> {}, msg -> {}", new Object[]{str, Integer.valueOf(qos.ordinal()), str2});
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos.ordinal());
        mqttMessage.setRetained(z);
        mqttMessage.setPayload(str2.getBytes(charset));
        publish(this.client.getTopic(str), mqttMessage);
        return this;
    }

    private Consumers getConsumers(String str) {
        return this.topicConsumers.get(str);
    }
}
