package at.ac.ait.lablink.core.connection.mqtt.impl;

import at.ac.ait.lablink.core.connection.IConnectionHandler;
import at.ac.ait.lablink.core.connection.ex.LowLevelCommRuntimeException;
import at.ac.ait.lablink.core.connection.mqtt.IMqttConnectionListener;
import at.ac.ait.lablink.core.connection.mqtt.IMqttPublisher;
import at.ac.ait.lablink.core.connection.mqtt.IMqttReceiverCallback;
import at.ac.ait.lablink.core.connection.mqtt.IMqttSubscriber;
import at.ac.ait.lablink.core.ex.LlCoreRuntimeException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync.class */
public class MqttClientSync implements MqttCallback, IMqttPublisher, IConnectionHandler, IMqttSubscriber {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) MqttClientSync.class);
    private int mqttConnectionTimeout;
    private final String clientId;
    private final String brokerAddress;
    private IMqttReceiverCallback receiveCallback;
    private final ReconnectionThread reconnectionThread;
    private final ReceivedMessageConsumer receivedMessageConsumerThread;
    private final int qualityOfService = 0;
    private final String defaultBrokerAddress = "localhost";
    private final int defaultBrokerPort = 1883;
    private final String defaultConnectionProtocol = "tcp";
    private final boolean defaultEnableReconnection = true;
    private final int defaultReconnectInterval = 10;
    private final int defaultReconnectNumberOfTries = -1;
    private final int defaultReceivedMessagesQueueSize = 2048;
    private MqttClient mqttClient = null;
    private final Object receiveCallbackSyncMonitor = new Object();
    private final List<IMqttConnectionListener> connectionListeners = new ArrayList();
    private final Object connectionListenersSyncMonitor = new Object();
    private ELlClientState currentClientState = ELlClientState.DISCONNECTED_FROM_BROKER;
    private final Object publishMonitor = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync$ELlClientState.class */
    public enum ELlClientState {
        DISCONNECTED_FROM_BROKER,
        CONNECTED_TO_BROKER,
        TRY_TO_RECONNECT
    }

    /* loaded from: input_file:at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync$ReceivedMessageConsumer.class */
    private class ReceivedMessageConsumer extends Thread {
        private final BlockingQueue<ReceivedMessage> receivedMsgQueue;
        private IMqttReceiverCallback receiverCallback;
        private boolean isRunning = true;
        private final Object syncMonitor = new Object();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync$ReceivedMessageConsumer$ReceivedMessage.class */
        public class ReceivedMessage {
            public String topic;
            public byte[] payload;

            ReceivedMessage() {
            }
        }

        ReceivedMessageConsumer(int i, String str) {
            setDaemon(true);
            setName("ReceivedMessageConsumer: " + str);
            this.receivedMsgQueue = new ArrayBlockingQueue(i);
        }

        void setReceiveCallback(IMqttReceiverCallback iMqttReceiverCallback) {
            synchronized (this.syncMonitor) {
                this.receiverCallback = iMqttReceiverCallback;
            }
        }

        void addNewMessage(String str, byte[] bArr) {
            ReceivedMessage receivedMessage = new ReceivedMessage();
            receivedMessage.topic = str;
            receivedMessage.payload = bArr;
            try {
                this.receivedMsgQueue.put(receivedMessage);
            } catch (InterruptedException e) {
            }
        }

        void shutdown() {
            this.isRunning = false;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.isRunning) {
                try {
                    ReceivedMessage take = this.receivedMsgQueue.take();
                    MqttClientSync.logger.trace("Process received message (Topic: {} IPayload: {}) No of waiting objects: {}", take.topic, new String(take.payload), Integer.valueOf(this.receivedMsgQueue.size()));
                    synchronized (this.syncMonitor) {
                        if (this.receiverCallback == null) {
                            MqttClientSync.logger.warn("No ReceiverCallback is set in ReceivedMessageConsumerThread.");
                        } else {
                            this.receiverCallback.handleRawMqttMessage(take.topic, take.payload);
                        }
                    }
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync$ReconnectionThread.class */
    public class ReconnectionThread extends Thread {
        private final MqttClientSync parent;
        private boolean enableReconnection;
        private int reconnectionInterval;
        private int reconnectionTries;
        private boolean keepRunning = true;
        private int actualTry = 1;

        ReconnectionThread(MqttClientSync mqttClientSync) {
            this.parent = mqttClientSync;
            setDaemon(true);
            setName("ReconnectionThread: " + mqttClientSync.getClientId());
            this.enableReconnection = true;
            this.reconnectionInterval = 10;
            this.reconnectionTries = -1;
        }

        synchronized void shutdown() {
            this.keepRunning = false;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            MqttClientSync.logger.trace("Reconnection timer thread started (activated: {}, Interval: {}, Tries: {})", Boolean.valueOf(this.enableReconnection), Integer.valueOf(this.reconnectionInterval), Integer.valueOf(this.reconnectionTries));
            while (this.keepRunning) {
                ELlClientState currentClientState = this.parent.getCurrentClientState();
                MqttClientSync.logger.trace("ReconnectionTimerThread activated: {}", MqttClientSync.this.currentClientState);
                ELlClientState eLlClientState = currentClientState;
                if (!this.enableReconnection && eLlClientState == ELlClientState.TRY_TO_RECONNECT) {
                    this.parent.setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
                    eLlClientState = this.parent.getCurrentClientState();
                }
                if (eLlClientState == ELlClientState.CONNECTED_TO_BROKER || eLlClientState == ELlClientState.DISCONNECTED_FROM_BROKER) {
                    this.actualTry = 1;
                }
                if (this.enableReconnection && eLlClientState == ELlClientState.TRY_TO_RECONNECT) {
                    handleReconnection();
                }
                try {
                    synchronized (this) {
                        if (currentClientState == this.parent.getCurrentClientState()) {
                            if (eLlClientState == ELlClientState.TRY_TO_RECONNECT) {
                                wait(this.reconnectionInterval);
                            } else {
                                wait();
                            }
                        }
                    }
                } catch (InterruptedException e) {
                }
            }
        }

        private void handleReconnection() {
            MqttClientSync.logger.trace("Reconnection try: {}", Integer.valueOf(this.actualTry));
            try {
                this.parent.connect();
            } catch (LowLevelCommRuntimeException e) {
                MqttClientSync.logger.debug("Reconnection try ({}) was not successful.", Integer.valueOf(this.actualTry));
            }
            if (this.reconnectionTries == -1 || this.actualTry < this.reconnectionTries) {
                this.actualTry++;
                return;
            }
            MqttClientSync.logger.warn("Maximum number of reconnection tries exceeds. Stop to reconnect");
            this.parent.setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
            this.actualTry = 1;
        }

        void setEnableReconnection(boolean z) {
            this.enableReconnection = z;
        }

        void setReconnectionInterval(int i) {
            if (i <= 0) {
                throw new IllegalArgumentException(String.format("False reconnection interval in milliseconds (%d) was set. The parameter should be greater than 0.", Integer.valueOf(i)));
            }
            this.reconnectionInterval = i;
        }

        void setReconnectionTries(int i) {
            if (i != -1 && i <= 0) {
                throw new IllegalArgumentException(String.format("False reconnection tries (%d) want to be set. The parameter should be -1 for infinite tries or greater than 0", Integer.valueOf(i)));
            }
            this.reconnectionTries = i;
        }

        int getReconnectNumberOfTries() {
            return this.reconnectionTries;
        }

        int getReconnectionInterval() {
            return this.reconnectionInterval;
        }

        boolean isEnableReconnection() {
            return this.enableReconnection;
        }
    }

    public MqttClientSync(String str, Configuration configuration) {
        this.mqttConnectionTimeout = 30;
        if (configuration == null) {
            logger.info("No configuration is set for low-level MQTT client. Use default configuration.");
            configuration = new BaseConfiguration();
        }
        logger.info("Initialize low-level MQTT client '{}'.", str);
        this.clientId = str;
        this.brokerAddress = createMqttBrokerAddress(configuration.getString("lowLevelComm.brokerAddress", "localhost"), configuration.getInt("lowLevelComm.brokerPort", 1883), configuration.getString("lowLevelComm.connectionProtocol", "tcp"));
        logger.info("BrokerAddress: {}", this.brokerAddress);
        this.mqttConnectionTimeout = configuration.getInt("lowLevelComm.mqttConnectionTimeout", 30);
        logger.info("Connection Timeout: {}", this.mqttConnectionTimeout + "s");
        this.reconnectionThread = new ReconnectionThread(this);
        this.reconnectionThread.setEnableReconnection(configuration.getBoolean("lowLevelComm.enableReconnection", true));
        this.reconnectionThread.setReconnectionInterval(configuration.getInt("lowLevelComm.reconnectInterval", 10) * 1000);
        this.reconnectionThread.setReconnectionTries(configuration.getInt("lowLevelComm.reconnectNumberOfTries", -1));
        logger.info("Reconnection Settings: Enabled: {} Interval: {}ms NoOfTries: {}", Boolean.valueOf(this.reconnectionThread.isEnableReconnection()), Integer.valueOf(this.reconnectionThread.getReconnectionInterval()), Integer.valueOf(this.reconnectionThread.getReconnectNumberOfTries()));
        this.reconnectionThread.start();
        int i = configuration.getInt("lowLevelComm.receivedMessagesQueueSize", 2048);
        this.receivedMessageConsumerThread = new ReceivedMessageConsumer(i, this.clientId);
        logger.info("ReceivedMessageConsumer: Queue Size: {}", Integer.valueOf(i));
        this.receivedMessageConsumerThread.start();
    }

    public String toString() {
        return "MqttClientSync(" + this.clientId + ", " + this.brokerAddress + ')';
    }

    private String createMqttBrokerAddress(String str, int i, String str2) {
        return String.format("%s://%s:%d", str2, str, Integer.valueOf(i));
    }

    private static MqttClient createMqttClient(String str, String str2) throws MqttException {
        return new MqttClient(str, str2, null);
    }

    public IMqttReceiverCallback getReceiveCallback() {
        return this.receiveCallback;
    }

    public void setReceiveCallback(IMqttReceiverCallback iMqttReceiverCallback) {
        if (iMqttReceiverCallback == null) {
            throw new LlCoreRuntimeException("Set ReceiveCallback failed: Parameter is a null.");
        }
        logger.debug("Set new ReceiveCallback: {}", iMqttReceiverCallback);
        synchronized (this.receiveCallbackSyncMonitor) {
            this.receiveCallback = iMqttReceiverCallback;
            this.receivedMessageConsumerThread.setReceiveCallback(this.receiveCallback);
        }
    }

    public void addMqttConnectionListener(IMqttConnectionListener iMqttConnectionListener) {
        if (iMqttConnectionListener == null) {
            throw new LlCoreRuntimeException("Add ConnectionListener failed: Parameter is a null.");
        }
        if (this.connectionListeners.contains(iMqttConnectionListener)) {
            return;
        }
        logger.debug("Add connection listener: {}", iMqttConnectionListener.toString());
        synchronized (this.connectionListenersSyncMonitor) {
            this.connectionListeners.add(iMqttConnectionListener);
        }
    }

    public void removeConnectionListener(IMqttConnectionListener iMqttConnectionListener) {
        logger.trace("Remove connection listener: {}", iMqttConnectionListener.toString());
        synchronized (this.connectionListenersSyncMonitor) {
            this.connectionListeners.remove(iMqttConnectionListener);
        }
    }

    List<IMqttConnectionListener> getConnectionListeners() {
        return this.connectionListeners;
    }

    public String getClientId() {
        return this.clientId;
    }

    public String getBrokerAddress() {
        return this.brokerAddress;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void setCurrentClientStateAndTriggerReconnect(ELlClientState eLlClientState) {
        this.currentClientState = eLlClientState;
        synchronized (this.reconnectionThread) {
            this.reconnectionThread.notify();
        }
    }

    ELlClientState getCurrentClientState() {
        return this.currentClientState;
    }

    public void shutdown() {
        if (isConnected()) {
            try {
                disconnect();
            } catch (LowLevelCommRuntimeException e) {
                logger.warn("Error while disconnecting from broker during shutdown.");
            }
        }
        this.reconnectionThread.shutdown();
        this.receivedMessageConsumerThread.shutdown();
    }

    @Override // at.ac.ait.lablink.core.connection.IConnectionHandler
    public void connect() {
        if (this.mqttClient == null) {
            try {
                this.mqttClient = createMqttClient(this.brokerAddress, this.clientId);
            } catch (MqttException e) {
                throw new LowLevelCommRuntimeException(e);
            }
        }
        if (isConnected()) {
            return;
        }
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(this.mqttConnectionTimeout);
        this.mqttClient.setCallback(this);
        try {
            this.mqttClient.connect(mqttConnectOptions);
            setCurrentClientStateAndTriggerReconnect(ELlClientState.CONNECTED_TO_BROKER);
            synchronized (this.connectionListenersSyncMonitor) {
                Iterator<IMqttConnectionListener> it = this.connectionListeners.iterator();
                while (it.hasNext()) {
                    it.next().onEstablishedMqttConnection();
                }
            }
            logger.info("MqttClient connected to broker {}", this.mqttClient.getServerURI());
        } catch (MqttException e2) {
            setCurrentClientStateAndTriggerReconnect(ELlClientState.TRY_TO_RECONNECT);
            throw new LowLevelCommRuntimeException(e2);
        }
    }

    @Override // at.ac.ait.lablink.core.connection.IConnectionHandler
    public void disconnect() {
        setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
        if (isConnected()) {
            synchronized (this.connectionListenersSyncMonitor) {
                Iterator<IMqttConnectionListener> it = this.connectionListeners.iterator();
                while (it.hasNext()) {
                    it.next().onDisconnectingMqttConnection();
                }
            }
            try {
                this.mqttClient.disconnect();
                logger.info("MqttClient disconnected from broker {}", this.mqttClient.getServerURI());
            } catch (MqttException e) {
                throw new LowLevelCommRuntimeException(e);
            }
        }
    }

    @Override // at.ac.ait.lablink.core.connection.IConnectionHandler
    public boolean isConnected() {
        return this.mqttClient != null && this.mqttClient.isConnected();
    }

    @Override // at.ac.ait.lablink.core.connection.mqtt.IMqttPublisher
    public void publish(String str, byte[] bArr) {
        if (!isConnected()) {
            throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
        }
        MqttUtils.validateMqttTopic(str);
        MqttMessage mqttMessage = new MqttMessage(bArr);
        Objects.requireNonNull(this);
        mqttMessage.setQos(0);
        synchronized (this.publishMonitor) {
            try {
                this.mqttClient.publish(str, mqttMessage);
            } catch (MqttException e) {
                throw new LowLevelCommRuntimeException(e);
            }
        }
    }

    @Override // at.ac.ait.lablink.core.connection.mqtt.IMqttSubscriber
    public void subscribe(String str) {
        if (!isConnected()) {
            throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
        }
        try {
            this.mqttClient.subscribe(str);
        } catch (MqttException e) {
            throw new LowLevelCommRuntimeException(e);
        }
    }

    @Override // at.ac.ait.lablink.core.connection.mqtt.IMqttSubscriber
    public void subscribe(List<String> list) {
        if (!isConnected()) {
            throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            MqttUtils.validateMqttSubscription(it.next());
        }
        try {
            this.mqttClient.subscribe((String[]) list.toArray(new String[0]));
        } catch (MqttException e) {
            throw new LowLevelCommRuntimeException(e);
        }
    }

    @Override // at.ac.ait.lablink.core.connection.mqtt.IMqttSubscriber
    public void unsubscribe(String str) {
        if (!isConnected()) {
            throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
        }
        try {
            this.mqttClient.unsubscribe(str);
        } catch (MqttException e) {
            throw new LowLevelCommRuntimeException(e);
        }
    }

    @Override // at.ac.ait.lablink.core.connection.mqtt.IMqttSubscriber
    public void unsubscribe(List<String> list) {
        if (!isConnected()) {
            throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            MqttUtils.validateMqttSubscription(it.next());
        }
        try {
            this.mqttClient.unsubscribe((String[]) list.toArray(new String[0]));
        } catch (MqttException e) {
            throw new LowLevelCommRuntimeException(e);
        }
    }

    @Override // org.eclipse.paho.client.mqttv3.MqttCallback
    public void connectionLost(Throwable th) {
        logger.warn("MQTT connection lost: {}", th.toString());
        synchronized (this.connectionListenersSyncMonitor) {
            Iterator<IMqttConnectionListener> it = this.connectionListeners.iterator();
            while (it.hasNext()) {
                it.next().onLostMqttConnection();
            }
        }
        setCurrentClientStateAndTriggerReconnect(ELlClientState.TRY_TO_RECONNECT);
    }

    @Override // org.eclipse.paho.client.mqttv3.MqttCallback
    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        logger.trace("New MQTT message received: Topic({}) ReceivedMessage()", str, mqttMessage.toString());
        synchronized (this.receiveCallbackSyncMonitor) {
            if (this.receiveCallback != null && !mqttMessage.isDuplicate()) {
                this.receivedMessageConsumerThread.addNewMessage(str, mqttMessage.getPayload());
            }
        }
    }

    @Override // org.eclipse.paho.client.mqttv3.MqttCallback
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}
