package se.skltp.ei.intsvc.update.collect;

import java.util.List;
import java.util.UUID;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.skltp.ei.intsvc.EiConstants;

/* loaded from: input_file:se/skltp/ei/intsvc/update/collect/JmsMessageCollectionController.class */
public class JmsMessageCollectionController implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(JmsMessageCollectionController.class);
    private String jmsInputQueue = "skltp.ei.collect";
    private String jmsOutputQueue = "skltp.ei.process";
    private String jmsErrorQueue = "DLQ.skltp.ei.collect";
    private long jmsReceiveTimeoutMillis = 30000;
    private MessageCollectionStrategy messageCollectionStrategy;
    private QueueConnectionFactory qcf;
    private QueueConnection conn;

    public void setJmsInputQueue(String str) {
        this.jmsInputQueue = str;
    }

    public void setJmsOutputQueue(String str) {
        this.jmsOutputQueue = str;
    }

    public void setJmsErrorQueue(String str) {
        this.jmsErrorQueue = str;
    }

    public void setJmsReceiveTimeoutMillis(long j) {
        this.jmsReceiveTimeoutMillis = j;
    }

    public void setMessageCollectionStrategy(MessageCollectionStrategy messageCollectionStrategy) {
        this.messageCollectionStrategy = messageCollectionStrategy;
    }

    public void setQueueConnectionFactory(QueueConnectionFactory queueConnectionFactory) {
        this.qcf = queueConnectionFactory;
    }

    public void init() {
        startMessageCollectionThread();
    }

    private void startMessageCollectionThread() {
        try {
            this.conn = this.qcf.createQueueConnection();
            log.debug("got a JMS connection");
            this.conn.start();
            Thread thread = new Thread(this, getClass().getName());
            thread.setDaemon(true);
            thread.start();
            log.info("started JMS message listener on queue: {}, receiveTimeout: {}, output queue: {}, error queue: {}", new Object[]{this.jmsInputQueue, Long.valueOf(this.jmsReceiveTimeoutMillis), this.jmsOutputQueue, this.jmsErrorQueue});
        } catch (JMSException e) {
            log.error("failed to get connection to JMS broker", e);
            throw new RuntimeException("failed to get connection to JMS broker", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        runCollectMessagesLoop();
    }

    private void runCollectMessagesLoop() {
        while (true) {
            long j = this.jmsReceiveTimeoutMillis;
            long currentTimeMillis = System.currentTimeMillis();
            Session session = null;
            try {
                try {
                    session = this.conn.createSession(true, 0);
                    MessageConsumer createConsumer = session.createConsumer(session.createQueue(this.jmsInputQueue));
                    boolean z = false;
                    while (!z && !this.messageCollectionStrategy.isCollectedMessagesReadyToBeTransmitted()) {
                        TextMessage receive = createConsumer.receive(this.jmsReceiveTimeoutMillis);
                        if (receive == null) {
                            log.debug("JMS recieve timed out after [ms]: {}", Long.valueOf(this.jmsReceiveTimeoutMillis));
                        } else if (receive instanceof TextMessage) {
                            log.debug("JMS receive returned a TextMessage");
                            try {
                                this.messageCollectionStrategy.collectMessage(receive.getText());
                            } catch (Exception e) {
                                z = true;
                                log.error("could not collect message, will send message to error queue", e);
                                sendMessageToErrorQueue(session, receive);
                            }
                        } else {
                            log.error("JMS receive returned unexpected message type: {}, will send message to error queue", receive.getClass().getName());
                            z = true;
                            sendMessageToErrorQueue(session, receive);
                        }
                    }
                    sendMessagesToOutQueue(session, this.messageCollectionStrategy.getCollectedMessagesAndClearBuffer());
                    session.commit();
                    log.debug("commited JMS transaction");
                    if (session != null) {
                        try {
                            session.close();
                        } catch (JMSException e2) {
                        }
                    }
                } catch (Throwable th) {
                    if (session != null) {
                        try {
                            session.close();
                        } catch (JMSException e3) {
                        }
                    }
                    throw th;
                }
            } catch (JMSException e4) {
                log.error("error occured in message receive loop", e4);
                if (session != null) {
                    try {
                        session.rollback();
                    } catch (JMSException e5) {
                        log.warn("failed to rollback JMS transaction", e4);
                    }
                }
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e6) {
                    }
                }
            }
            waitIfFastSpinning(j, currentTimeMillis);
        }
    }

    private void waitIfFastSpinning(long j, long j2) {
        if (System.currentTimeMillis() <= j2 + j) {
            try {
                log.debug("entered fast spinning loop deplay ...");
                Thread.sleep(j);
            } catch (InterruptedException e) {
            }
        }
    }

    private void sendMessagesToOutQueue(Session session, List<CollectedMessage> list) throws JMSException {
        if (list.isEmpty()) {
            log.debug("no collected msgs in buffer to transmit");
            return;
        }
        TextMessage createTextMessage = session.createTextMessage();
        MessageProducer createProducer = session.createProducer(session.createQueue(this.jmsOutputQueue));
        createProducer.setDeliveryMode(2);
        for (CollectedMessage collectedMessage : list) {
            log.debug("sending msg to output queue: {}", this.jmsOutputQueue);
            createTextMessage.setText(collectedMessage.getPayload());
            createTextMessage.setStringProperty(EiConstants.EI_ORIGINAL_CONSUMER_ID, "collect");
            createTextMessage.setStringProperty("soitoolkit_correlationId", UUID.randomUUID().toString());
            createTextMessage.setStringProperty(EiConstants.EI_LOG_MESSAGE_TYPE, EiConstants.EI_LOG_MESSAGE_TYPE_UPDATE);
            createTextMessage.setStringProperty(EiConstants.EI_LOG_NUMBER_OF_RECORDS_IN_MESSAGE, String.valueOf(collectedMessage.getStatisticsNrRecords()));
            createTextMessage.setStringProperty(EiConstants.EI_LOG_IS_UPDATE_ROUTED_VIA_COLLECT, String.valueOf(Boolean.TRUE));
            createTextMessage.setStringProperty(EiConstants.EI_LOG_UPDATE_COLLECT_NR_MESSAGES, String.valueOf(collectedMessage.getStatisticsCollectedNrMessages()));
            createTextMessage.setStringProperty(EiConstants.EI_LOG_UPDATE_COLLECT_NR_RECORDS, String.valueOf(collectedMessage.getStatisticsCollectedNrRecords()));
            createTextMessage.setStringProperty(EiConstants.EI_LOG_UPDATE_COLLECT_BUFFER_AGE_MS, String.valueOf(collectedMessage.getStatisticsBufferAgeMs()));
            createProducer.send(createTextMessage);
        }
    }

    private void sendMessageToErrorQueue(Session session, Message message) throws JMSException {
        log.debug("sending message to error queue: {}", this.jmsErrorQueue);
        MessageProducer createProducer = session.createProducer(session.createQueue(this.jmsErrorQueue));
        createProducer.setDeliveryMode(2);
        createProducer.send(message);
    }
}
