package org.springframework.integration.mqtt.inbound;

import java.util.concurrent.ScheduledFuture;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

/* loaded from: input_file:org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.class */
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter implements MqttCallback {
    private final MqttPahoClientFactory clientFactory;
    private volatile MqttClient client;
    private volatile ScheduledFuture<?> reconnectFuture;
    private volatile boolean connected;

    public MqttPahoMessageDrivenChannelAdapter(String str, String str2, MqttPahoClientFactory mqttPahoClientFactory, String... strArr) {
        super(str, str2, strArr);
        this.clientFactory = mqttPahoClientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String str, String str2, String... strArr) {
        this(str, str2, new DefaultMqttPahoClientFactory(), strArr);
    }

    protected void doStart() {
        super.doStart();
        try {
            connectAndSubscribe();
        } catch (Exception e) {
            this.logger.error("Exception while connecting and subscribing, retrying", e);
            scheduleReconnect();
        }
    }

    protected void doStop() {
        super.doStop();
        try {
            this.client.unsubscribe(getTopic());
            this.client.disconnect();
            this.client.close();
            this.connected = false;
            this.client = null;
        } catch (MqttException e) {
            this.logger.error("Exception while unsubscribing and disconnecting", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connectAndSubscribe() throws MqttException {
        this.client = this.clientFactory.getClientInstance(getUrl(), getClientId());
        this.client.connect(this.clientFactory.getConnectionOptions());
        try {
            this.client.subscribe(getTopic());
            if (this.client.isConnected()) {
                this.client.setCallback(this);
                this.connected = true;
                if (this.reconnectFuture != null) {
                    cancelReconnect();
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Connected and subscribed to " + getTopic());
                }
            }
        } catch (MqttException e) {
            this.client.disconnect();
            throw e;
        }
    }

    private synchronized void cancelReconnect() {
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(false);
            this.reconnectFuture = null;
        }
    }

    private void scheduleReconnect() {
        try {
            this.reconnectFuture = getTaskScheduler().scheduleWithFixedDelay(new Runnable() { // from class: org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (MqttPahoMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                            MqttPahoMessageDrivenChannelAdapter.this.logger.debug("Attempting reconnect");
                        }
                        if (!MqttPahoMessageDrivenChannelAdapter.this.connected) {
                            MqttPahoMessageDrivenChannelAdapter.this.connectAndSubscribe();
                        }
                    } catch (MqttException e) {
                        MqttPahoMessageDrivenChannelAdapter.this.logger.error("Exception while connecting and subscribing", e);
                    }
                }
            }, 10000L);
        } catch (Exception e) {
            this.logger.error("Failed to schedule reconnect", e);
        }
    }

    public void connectionLost(Throwable th) {
        this.logger.error("Lost connection:" + th.getMessage() + "; retrying...");
        this.connected = false;
        scheduleReconnect();
    }

    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        sendMessage(getConverter().toMessage(str, mqttMessage));
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}
