package io.eventuate.local.common;

import io.eventuate.common.eventuate.local.BinlogFileOffset;
import io.eventuate.common.eventuate.local.PublishedEvent;
import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.messaging.kafka.basic.consumer.ConsumerPropertiesFactory;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerConfigurationProperties;
import io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory;
import io.eventuate.messaging.kafka.basic.consumer.KafkaMessageConsumer;
import io.eventuate.messaging.kafka.common.EventuateKafkaMultiMessageConverter;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/local/common/DuplicatePublishingDetector.class */
public class DuplicatePublishingDetector implements PublishingFilter {
    private String kafkaBootstrapServers;
    private EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties;
    private KafkaConsumerFactory kafkaConsumerFactory;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private Map<String, Optional<BinlogFileOffset>> maxOffsetsForTopics = new HashMap();
    private boolean okToProcess = false;
    private EventuateKafkaMultiMessageConverter eventuateKafkaMultiMessageConverter = new EventuateKafkaMultiMessageConverter();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/eventuate/local/common/DuplicatePublishingDetector$PartitionOffset.class */
    public class PartitionOffset {
        public final int partition;
        public final long offset;

        public String toString() {
            return "PartitionOffset{partition=" + this.partition + ", offset=" + this.offset + '}';
        }

        public PartitionOffset(int i, long j) {
            this.partition = i;
            this.offset = j;
        }
    }

    public DuplicatePublishingDetector(String str, EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties, KafkaConsumerFactory kafkaConsumerFactory) {
        this.kafkaBootstrapServers = str;
        this.eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties;
        this.kafkaConsumerFactory = kafkaConsumerFactory;
    }

    @Override // io.eventuate.local.common.PublishingFilter
    public boolean shouldBePublished(BinlogFileOffset binlogFileOffset, String str) {
        if (this.okToProcess) {
            return true;
        }
        Optional<BinlogFileOffset> computeIfAbsent = this.maxOffsetsForTopics.computeIfAbsent(str, this::fetchMaxOffsetFor);
        this.logger.info("For topic {} max is {}", str, computeIfAbsent);
        binlogFileOffset.getClass();
        this.okToProcess = ((Boolean) computeIfAbsent.map(binlogFileOffset::isSameOrAfter).orElse(true)).booleanValue();
        this.logger.info("max = {}, sourceBinlogFileOffset = {} okToProcess = {}", new Object[]{computeIfAbsent, binlogFileOffset, Boolean.valueOf(this.okToProcess)});
        return this.okToProcess;
    }

    private Optional<BinlogFileOffset> fetchMaxOffsetFor(String str) {
        Properties makeDefaultConsumerProperties = ConsumerPropertiesFactory.makeDefaultConsumerProperties(this.kafkaBootstrapServers, "duplicate-checker-" + str + "-" + System.currentTimeMillis());
        makeDefaultConsumerProperties.putAll(this.eventuateKafkaConsumerConfigurationProperties.getProperties());
        KafkaMessageConsumer makeConsumer = this.kafkaConsumerFactory.makeConsumer((String) null, makeDefaultConsumerProperties);
        List list = (List) EventuateKafkaConsumer.verifyTopicExistsBeforeSubscribing(makeConsumer, str).stream().map(partitionInfo -> {
            return new TopicPartition(str, partitionInfo.partition());
        }).collect(Collectors.toList());
        makeConsumer.assign(list);
        makeConsumer.poll(Duration.ZERO);
        this.logger.info("Seeking to end");
        try {
            makeConsumer.seekToEnd(list);
            List list2 = (List) list.stream().map(topicPartition -> {
                return new PartitionOffset(topicPartition.partition(), makeConsumer.position(topicPartition) - 1);
            }).filter(partitionOffset -> {
                return partitionOffset.offset >= 0;
            }).collect(Collectors.toList());
            this.logger.info("Seeking to positions=" + list2);
            list2.forEach(partitionOffset2 -> {
                makeConsumer.seek(new TopicPartition(str, partitionOffset2.partition), partitionOffset2.offset);
            });
            this.logger.info("Polling for records");
            ArrayList arrayList = new ArrayList();
            while (arrayList.size() < list2.size()) {
                ConsumerRecords poll = makeConsumer.poll(Duration.of(1000L, ChronoUnit.MILLIS));
                arrayList.getClass();
                poll.forEach((v1) -> {
                    r1.add(v1);
                });
            }
            this.logger.info("Got records: {}", Integer.valueOf(arrayList.size()));
            Optional<BinlogFileOffset> max = arrayList.stream().flatMap(consumerRecord -> {
                this.logger.info(String.format("got record: %s %s", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())));
                return this.eventuateKafkaMultiMessageConverter.convertBytesToValues((byte[]) consumerRecord.value()).stream().map(str2 -> {
                    return ((PublishedEvent) JSonMapper.fromJson(str2, PublishedEvent.class)).getBinlogFileOffset();
                });
            }).filter((v0) -> {
                return v0.isPresent();
            }).map((v0) -> {
                return v0.get();
            }).max((binlogFileOffset, binlogFileOffset2) -> {
                return binlogFileOffset.isSameOrAfter(binlogFileOffset2) ? 1 : -1;
            });
            makeConsumer.close();
            return max;
        } catch (IllegalStateException e) {
            this.logger.error("Error seeking " + str, e);
            return Optional.empty();
        }
    }
}
