package io.eventuate.local.db.log.common;

import io.eventuate.common.eventuate.local.BinlogFileOffset;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerConfigurationProperties;
import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/local/db/log/common/OffsetKafkaStore.class */
public abstract class OffsetKafkaStore implements OffsetStore {
    private Logger logger = LoggerFactory.getLogger(getClass());
    protected final String dbHistoryTopicName;
    protected EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties;
    private EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties;
    private static final int N = 20;

    public OffsetKafkaStore(String str, EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties) {
        this.eventuateKafkaConfigurationProperties = eventuateKafkaConfigurationProperties;
        this.dbHistoryTopicName = str;
        this.eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties;
    }

    @Override // io.eventuate.local.db.log.common.OffsetStore
    public Optional<BinlogFileOffset> getLastBinlogFileOffset() {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        try {
            getPartitionsForTopicRetryOnFail(createConsumer, 10);
            createConsumer.subscribe(Arrays.asList(this.dbHistoryTopicName));
            int i = N;
            BinlogFileOffset binlogFileOffset = null;
            boolean z = false;
            while (!z) {
                ConsumerRecords poll = createConsumer.poll(100L);
                if (poll.isEmpty()) {
                    i--;
                    if (i == 0) {
                        z = true;
                    }
                } else {
                    i = N;
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        BinlogFileOffset handleRecord = handleRecord((ConsumerRecord) it.next());
                        if (handleRecord != null) {
                            binlogFileOffset = handleRecord;
                        }
                    }
                }
            }
            Optional<BinlogFileOffset> ofNullable = Optional.ofNullable(binlogFileOffset);
            if (createConsumer != null) {
                createConsumer.close();
            }
            return ofNullable;
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public List<PartitionInfo> getPartitionsForTopicRetryOnFail(KafkaConsumer<String, String> kafkaConsumer, int i) {
        try {
            return kafkaConsumer.partitionsFor(this.dbHistoryTopicName);
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
            if (i <= 0) {
                throw new RuntimeException(e);
            }
            try {
                Thread.sleep(100L);
                return getPartitionsForTopicRetryOnFail(kafkaConsumer, i - 1);
            } catch (InterruptedException e2) {
                throw new RuntimeException(e);
            }
        }
    }

    private KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.eventuateKafkaConfigurationProperties.getBootstrapServers());
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", UUID.randomUUID().toString());
        properties.put("enable.auto.commit", "false");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.putAll(this.eventuateKafkaConsumerConfigurationProperties.getProperties());
        return new KafkaConsumer<>(properties);
    }

    protected abstract BinlogFileOffset handleRecord(ConsumerRecord<String, String> consumerRecord);
}
