package pl.allegro.tech.hermes.frontend.buffer;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.timer.StartedTimersPair;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.message.JsonMessage;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoader.class */
public class BackupMessagesLoader {
    private static final Logger logger = LoggerFactory.getLogger(BackupMessagesLoader.class);
    private final BrokerMessageProducer brokerMessageProducer;
    private final BrokerListeners brokerListeners;
    private final TopicsCache topicsCache;
    private final Trackers trackers;
    private final Duration messageMaxAgeHours;
    private final int maxResendRetries;
    private final Duration resendSleep;
    private final Duration readTopicInfoSleep;
    private final Set<Topic> topicsAvailabilityCache = new HashSet();
    private final AtomicReference<ConcurrentLinkedQueue<Pair<Message, CachedTopic>>> toResend = new AtomicReference<>();

    public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer, BrokerListeners brokerListeners, TopicsCache topicsCache, Trackers trackers, BackupMessagesLoaderParameters backupMessagesLoaderParameters) {
        this.brokerMessageProducer = brokerMessageProducer;
        this.brokerListeners = brokerListeners;
        this.topicsCache = topicsCache;
        this.trackers = trackers;
        this.messageMaxAgeHours = backupMessagesLoaderParameters.getMaxAge();
        this.resendSleep = backupMessagesLoaderParameters.getLoadingPauseBetweenResend();
        this.readTopicInfoSleep = backupMessagesLoaderParameters.getLoadingWaitForBrokerTopicInfo();
        this.maxResendRetries = backupMessagesLoaderParameters.getMaxResendRetries();
    }

    public void loadMessages(List<BackupMessage> list) {
        logger.info("Loading {} messages from backup storage.", Integer.valueOf(list.size()));
        this.toResend.set(new ConcurrentLinkedQueue<>());
        sendMessages(list);
        if (this.toResend.get().size() == 0) {
            logger.info("No messages to resend.");
            return;
        }
        int i = 0;
        do {
            if (i > 0) {
                resendMessages(Lists.newArrayList(this.toResend.getAndSet(new ConcurrentLinkedQueue<>())), i);
            }
            try {
                Thread.sleep(this.resendSleep.toMillis());
            } catch (InterruptedException e) {
                logger.warn("Sleep interrupted", e);
            }
            i++;
            if (this.toResend.get().size() <= 0) {
                break;
            }
        } while (i <= this.maxResendRetries);
        logger.info("Finished resending messages from backup storage after retry #{} with {} unsent messages.", Integer.valueOf(i - 1), Integer.valueOf(this.toResend.get().size()));
    }

    public void loadFromTemporaryBackupV2File(File file) {
        try {
            FileInputStream fileInputStream = new FileInputStream(file);
            try {
                ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
                try {
                    List<BackupMessage> list = (List) objectInputStream.readObject();
                    logger.info("Loaded {} messages from temporary v2 backup file: {}", Integer.valueOf(list.size()), file);
                    loadMessages(list);
                    objectInputStream.close();
                    fileInputStream.close();
                } catch (Throwable th) {
                    try {
                        objectInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException | ClassNotFoundException e) {
            logger.error("Error reading temporary backup v2 files from path {}.", file.getAbsolutePath(), e);
        }
    }

    public void clearTopicsAvailabilityCache() {
        this.topicsAvailabilityCache.clear();
    }

    private void sendMessages(List<BackupMessage> list) {
        logger.info("Sending {} messages from backup storage.", Integer.valueOf(list.size()));
        int i = 0;
        int i2 = 0;
        for (BackupMessage backupMessage : list) {
            JsonMessage jsonMessage = new JsonMessage(backupMessage.getMessageId(), backupMessage.getData(), backupMessage.getTimestamp(), backupMessage.getPartitionKey());
            String qualifiedTopicName = backupMessage.getQualifiedTopicName();
            if (sendMessageIfNeeded(jsonMessage, qualifiedTopicName, this.topicsCache.getTopic(qualifiedTopicName), "sending")) {
                i++;
            } else {
                i2++;
            }
        }
        logger.info("Loaded and sent {} messages and discarded {} messages from the backup storage.", Integer.valueOf(i), Integer.valueOf(i2));
    }

    private void resendMessages(List<Pair<Message, CachedTopic>> list, int i) {
        logger.info("Resending {} messages from backup storage retry {}.", Integer.valueOf(list.size()), Integer.valueOf(i));
        int i2 = 0;
        int i3 = 0;
        for (Pair<Message, CachedTopic> pair : list) {
            Message message = (Message) pair.getKey();
            Optional<CachedTopic> of = Optional.of((CachedTopic) pair.getValue());
            if (sendMessageIfNeeded(message, of.get().getQualifiedName(), of, "resending")) {
                i2++;
            } else {
                i3++;
            }
        }
        logger.info("Resent {}/{} messages and discarded {} messages from the backup storage retry {}.", new Object[]{Integer.valueOf(i2), Integer.valueOf(list.size()), Integer.valueOf(i3), Integer.valueOf(i)});
    }

    private boolean sendMessageIfNeeded(Message message, String str, Optional<CachedTopic> optional, String str2) {
        if (!optional.isPresent()) {
            logger.error("Topic {} not present. Not {} message {} {}", new Object[]{str, str2, message.getId(), new String(message.getData(), Charset.defaultCharset())});
            return false;
        }
        if (!isNotStale(message)) {
            logger.warn("Not {} stale message {} {} {}", new Object[]{str2, message.getId(), str, new String(message.getData(), Charset.defaultCharset())});
            return false;
        }
        waitOnBrokerTopicAvailability(optional.get());
        sendMessage(message, optional.get());
        return true;
    }

    private void waitOnBrokerTopicAvailability(CachedTopic cachedTopic) {
        int i = 0;
        while (!isBrokerTopicAvailable(cachedTopic)) {
            try {
                i++;
                logger.info("Broker topic {} is not available, checked {} times.", cachedTopic.getTopic().getQualifiedName(), Integer.valueOf(i));
                Thread.sleep(this.readTopicInfoSleep.toMillis());
            } catch (InterruptedException e) {
                logger.warn("Waiting for broker topic availability interrupted. Topic: {}", cachedTopic.getTopic().getQualifiedName());
            }
        }
    }

    private boolean isBrokerTopicAvailable(CachedTopic cachedTopic) {
        if (this.topicsAvailabilityCache.contains(cachedTopic.getTopic())) {
            return true;
        }
        if (!this.brokerMessageProducer.isTopicAvailable(cachedTopic)) {
            return false;
        }
        this.topicsAvailabilityCache.add(cachedTopic.getTopic());
        logger.info("Broker topic {} is available.", cachedTopic.getTopic().getQualifiedName());
        return true;
    }

    private boolean isNotStale(Message message) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getTimestamp()), ZoneId.systemDefault()).isAfter(LocalDateTime.now().minusHours(this.messageMaxAgeHours.toHours()));
    }

    private void sendMessage(Message message, final CachedTopic cachedTopic) {
        final StartedTimersPair startBrokerLatencyTimers = cachedTopic.startBrokerLatencyTimers();
        this.brokerMessageProducer.send(message, cachedTopic, new PublishingCallback() { // from class: pl.allegro.tech.hermes.frontend.buffer.BackupMessagesLoader.1
            @Override // pl.allegro.tech.hermes.frontend.publishing.PublishingCallback
            public void onUnpublished(Message message2, Topic topic, Exception exc) {
                startBrokerLatencyTimers.close();
                BackupMessagesLoader.this.brokerListeners.onError(message2, topic, exc);
                BackupMessagesLoader.this.trackers.get(topic).logError(message2.getId(), topic.getName(), exc.getMessage(), "", Collections.emptyMap());
                BackupMessagesLoader.this.toResend.get().add(ImmutablePair.of(message2, cachedTopic));
            }

            @Override // pl.allegro.tech.hermes.frontend.publishing.PublishingCallback
            public void onPublished(Message message2, Topic topic) {
                startBrokerLatencyTimers.close();
                cachedTopic.incrementPublished();
                BackupMessagesLoader.this.brokerListeners.onAcknowledge(message2, topic);
                BackupMessagesLoader.this.trackers.get(topic).logPublished(message2.getId(), topic.getName(), "", Collections.emptyMap());
            }
        });
    }
}
