package ch.squaredesk.nova.comm.jms;

import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.comm.retrieving.MessageReceiver;
import ch.squaredesk.nova.comm.retrieving.MessageUnmarshaller;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/squaredesk/nova/comm/jms/JmsMessageReceiver.class */
public class JmsMessageReceiver<InternalMessageType> extends MessageReceiver<Destination, InternalMessageType, String, JmsSpecificInfo> {
    private static final Logger logger = LoggerFactory.getLogger(JmsMessageReceiver.class);
    private final JmsObjectRepository jmsObjectRepository;
    private final Map<String, Flowable<IncomingMessage<InternalMessageType, Destination, JmsSpecificInfo>>> mapDestinationIdToMessageStream;
    private final JmsMessageDetailsCreator messageDetailsCreator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JmsMessageReceiver(String str, JmsObjectRepository jmsObjectRepository, MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller, Metrics metrics) {
        super(str, messageUnmarshaller, metrics);
        this.mapDestinationIdToMessageStream = new ConcurrentHashMap();
        this.messageDetailsCreator = new JmsMessageDetailsCreator();
        this.jmsObjectRepository = jmsObjectRepository;
    }

    public Flowable<IncomingMessage<InternalMessageType, Destination, JmsSpecificInfo>> messages(Destination destination) {
        Objects.requireNonNull(destination, "destination must not ne bull");
        String idFor = this.jmsObjectRepository.idFor(destination);
        return this.mapDestinationIdToMessageStream.computeIfAbsent(idFor, str -> {
            return Flowable.generate(() -> {
                logger.info("Opening connection to destination " + idFor);
                this.metricsCollector.subscriptionCreated(idFor);
                return this.jmsObjectRepository.createMessageConsumer(destination);
            }, (messageConsumer, emitter) -> {
                IncomingMessage incomingMessage = null;
                while (incomingMessage == null) {
                    Message message = null;
                    try {
                        message = messageConsumer.receive();
                    } catch (Exception e) {
                    }
                    if (message == null) {
                        logger.info("Unable to receive message from consumer for destination " + idFor + ". Closing the connection...");
                        emitter.onComplete();
                        return;
                    }
                    if (message instanceof TextMessage) {
                        try {
                            String text = ((TextMessage) message).getText();
                            try {
                                incomingMessage = new IncomingMessage(this.messageUnmarshaller.unmarshal(text), this.messageDetailsCreator.createMessageDetailsFor(message));
                                this.metricsCollector.messageReceived(idFor);
                            } catch (Exception e2) {
                                logger.error("Unable to unmarshal incoming message " + text, e2);
                                this.metricsCollector.unparsableMessageReceived(idFor);
                            }
                        } catch (Exception e3) {
                            logger.error("Unable to read incoming message " + message, e3);
                            this.metricsCollector.unparsableMessageReceived(idFor);
                        }
                    } else {
                        logger.error("Unsupported type of incoming message " + message);
                        this.metricsCollector.unparsableMessageReceived(idFor);
                    }
                }
                emitter.onNext(incomingMessage);
            }, messageConsumer2 -> {
                this.metricsCollector.subscriptionDestroyed(idFor);
                this.jmsObjectRepository.destroyConsumer(messageConsumer2);
                this.mapDestinationIdToMessageStream.remove(idFor);
                logger.info("Closed connection to destination " + idFor);
            }).subscribeOn(Schedulers.io()).share();
        });
    }
}
