package pl.allegro.tech.hermes.frontend.buffer;

import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.core.ConditionTimeoutException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.callbacks.BrokerListenersPublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.callbacks.MetricsPublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoader.class */
public class BackupMessagesLoader {
    private static final Logger logger = LoggerFactory.getLogger(BackupMessagesLoader.class);
    private final BrokerMessageProducer brokerMessageProducer;
    private final HermesMetrics hermesMetrics;
    private final BrokerListeners brokerListeners;
    private final TopicsCache topicsCache;
    private final Trackers trackers;
    private final int secondsToWaitForTopicsCache;
    private final int messageMaxAgeHours;

    @Inject
    public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer, HermesMetrics hermesMetrics, BrokerListeners brokerListeners, TopicsCache topicsCache, Trackers trackers, ConfigFactory configFactory) {
        this.brokerMessageProducer = brokerMessageProducer;
        this.hermesMetrics = hermesMetrics;
        this.brokerListeners = brokerListeners;
        this.topicsCache = topicsCache;
        this.trackers = trackers;
        this.secondsToWaitForTopicsCache = configFactory.getIntProperty(Configs.MESSAGES_LOADING_WAIT_FOR_TOPICS_CACHE);
        this.messageMaxAgeHours = configFactory.getIntProperty(Configs.MESSAGES_LOCAL_STORAGE_MAX_AGE_HOURS);
    }

    public void loadMessages(MessageRepository messageRepository) {
        List<BackupMessage> findAll = messageRepository.findAll();
        logger.info("Loading {} messages from backup storage.", Integer.valueOf(findAll.size()));
        int i = 0;
        int i2 = 0;
        for (BackupMessage backupMessage : findAll) {
            Message message = new Message(backupMessage.getMessageId(), backupMessage.getData(), backupMessage.getTimestamp());
            Optional<Topic> loadTopic = loadTopic(TopicName.fromQualifiedName(backupMessage.getQualifiedTopicName()));
            if (loadTopic.isPresent() && isNotStale(backupMessage)) {
                i++;
                sendMessage(message, loadTopic.get());
            } else {
                i2++;
                logger.warn("Not sending stale message {} {} {}", new Object[]{backupMessage.getMessageId(), backupMessage.getQualifiedTopicName(), new String(backupMessage.getData(), Charset.defaultCharset())});
            }
        }
        logger.info("Loaded and sent {} messages and discarded {} messages from the backup storage", Integer.valueOf(i), Integer.valueOf(i2));
    }

    private boolean isNotStale(BackupMessage backupMessage) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(backupMessage.getTimestamp()), ZoneId.systemDefault()).isAfter(LocalDateTime.now().minusHours(this.messageMaxAgeHours));
    }

    private void sendMessage(Message message, Topic topic) {
        this.brokerMessageProducer.send(message, topic, new SimpleExecutionCallback(new MetricsPublishingCallback(this.hermesMetrics, topic), new BrokerListenersPublishingCallback(this.brokerListeners), new PublishingCallback() { // from class: pl.allegro.tech.hermes.frontend.buffer.BackupMessagesLoader.1
            @Override // pl.allegro.tech.hermes.frontend.publishing.PublishingCallback
            public void onUnpublished(Message message2, Topic topic2, Exception exc) {
                BackupMessagesLoader.this.trackers.get(topic2).logError(message2.getId(), topic2.getName(), exc.getMessage());
            }

            @Override // pl.allegro.tech.hermes.frontend.publishing.PublishingCallback
            public void onPublished(Message message2, Topic topic2) {
                BackupMessagesLoader.this.trackers.get(topic2).logPublished(message2.getId(), topic2.getName());
            }
        }));
    }

    private Optional<Topic> loadTopic(TopicName topicName) {
        try {
            Awaitility.await().pollDelay(1L, TimeUnit.NANOSECONDS).atMost(this.secondsToWaitForTopicsCache, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(this.topicsCache.getTopic(topicName).isPresent());
            });
            return this.topicsCache.getTopic(topicName);
        } catch (ConditionTimeoutException e) {
            logger.error("Could not read topic {} from topics cache after {} seconds", topicName, Integer.valueOf(this.secondsToWaitForTopicsCache));
            return Optional.empty();
        }
    }
}
