package no.nav.common.kafka.consumer.feilhandtering;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import no.nav.common.kafka.consumer.ConsumeStatus;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:no/nav/common/kafka/consumer/feilhandtering/KafkaConsumerRecordProcessor.class */
public class KafkaConsumerRecordProcessor {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final LockProvider lockProvider;
    private final KafkaConsumerRepository kafkaConsumerRepository;
    private final Map<String, StoredRecordConsumer> recordConsumers;
    private final KafkaConsumerRecordProcessorConfig config;
    private volatile boolean isRunning;
    private volatile boolean isClosed;

    public KafkaConsumerRecordProcessor(LockProvider lockProvider, KafkaConsumerRepository kafkaConsumerRepository, Map<String, StoredRecordConsumer> map, KafkaConsumerRecordProcessorConfig kafkaConsumerRecordProcessorConfig) {
        this.lockProvider = lockProvider;
        this.kafkaConsumerRepository = kafkaConsumerRepository;
        this.recordConsumers = map;
        this.config = kafkaConsumerRecordProcessorConfig;
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public void start() {
        if (this.isClosed) {
            throw new IllegalStateException("Cannot start closed consumer record forwarder");
        }
        if (this.isRunning) {
            return;
        }
        this.executorService.submit(this::recordHandlerLoop);
    }

    public void close() {
        this.isRunning = false;
        this.isClosed = true;
    }

    private void recordHandlerLoop() {
        this.isRunning = true;
        ArrayList arrayList = new ArrayList(this.recordConsumers.keySet());
        while (this.isRunning) {
            try {
                try {
                    try {
                        List<TopicPartition> topicPartitions = this.kafkaConsumerRepository.getTopicPartitions(arrayList);
                        if (topicPartitions.isEmpty()) {
                            Thread.sleep(this.config.pollTimeout.toMillis());
                        } else if (!consumeFromTopicPartitions(topicPartitions)) {
                            Thread.sleep(this.config.errorTimeout.toMillis());
                        }
                    } catch (Exception e) {
                        this.log.error("Failed to consume stored kafka records", e);
                        Thread.sleep(this.config.errorTimeout.toMillis());
                    }
                } finally {
                    this.log.info("Closing kafka consumer record processor...");
                }
            } catch (Exception e2) {
                this.log.error("Unexpected exception caught in stored consumer record handler loop", e2);
                this.log.info("Closing kafka consumer record processor...");
                return;
            }
        }
    }

    private boolean consumeFromTopicPartitions(List<TopicPartition> list) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        list.forEach(topicPartition -> {
            if (this.isRunning) {
                Optional<SimpleLock> empty = Optional.empty();
                try {
                    try {
                        empty = acquireLock(topicPartition);
                        if (empty.isEmpty()) {
                            empty.ifPresent((v0) -> {
                                v0.unlock();
                            });
                            return;
                        }
                        List<StoredConsumerRecord> records = this.kafkaConsumerRepository.getRecords(topicPartition.topic(), topicPartition.partition(), this.config.recordBatchSize);
                        StoredRecordConsumer storedRecordConsumer = this.recordConsumers.get(topicPartition.topic());
                        ArrayList arrayList = new ArrayList();
                        HashMap hashMap = new HashMap();
                        records.forEach(storedConsumerRecord -> {
                            ConsumeStatus consumeStatus;
                            Set set = (Set) hashMap.get(topicPartition);
                            Bytes wrap = Bytes.wrap(storedConsumerRecord.getKey());
                            if (set == null || wrap == null || !set.contains(wrap)) {
                                Exception exc = null;
                                try {
                                    consumeStatus = storedRecordConsumer.consume(storedConsumerRecord);
                                } catch (Exception e) {
                                    exc = e;
                                    consumeStatus = ConsumeStatus.FAILED;
                                }
                                if (consumeStatus == ConsumeStatus.OK) {
                                    this.log.info("Successfully process stored record topic={} partition={} offset={} dbId={}", new Object[]{storedConsumerRecord.getTopic(), Integer.valueOf(storedConsumerRecord.getPartition()), Long.valueOf(storedConsumerRecord.getOffset()), Long.valueOf(storedConsumerRecord.getId())});
                                    arrayList.add(Long.valueOf(storedConsumerRecord.getId()));
                                    return;
                                }
                                String format = String.format("Failed to process stored consumer record topic=%s partition=%d offset=%d dbId=%d", storedConsumerRecord.getTopic(), Integer.valueOf(storedConsumerRecord.getPartition()), Long.valueOf(storedConsumerRecord.getOffset()), Long.valueOf(storedConsumerRecord.getId()));
                                atomicBoolean.set(false);
                                this.log.error(format, exc);
                                this.kafkaConsumerRepository.incrementRetries(storedConsumerRecord.getId());
                                if (wrap != null) {
                                    ((Set) hashMap.computeIfAbsent(topicPartition, topicPartition -> {
                                        return new HashSet();
                                    })).add(wrap);
                                }
                            }
                        });
                        if (!arrayList.isEmpty()) {
                            this.kafkaConsumerRepository.deleteRecords(arrayList);
                            this.log.info("Stored consumer records deleted " + Arrays.toString(arrayList.toArray()));
                        }
                        empty.ifPresent((v0) -> {
                            v0.unlock();
                        });
                    } catch (Exception e) {
                        this.log.error("Unexpected exception caught while processing stored consumer records", e);
                        empty.ifPresent((v0) -> {
                            v0.unlock();
                        });
                    }
                } catch (Throwable th) {
                    empty.ifPresent((v0) -> {
                        v0.unlock();
                    });
                    throw th;
                }
            }
        });
        return atomicBoolean.get();
    }

    private Optional<SimpleLock> acquireLock(TopicPartition topicPartition) {
        return this.lockProvider.lock(new LockConfiguration(Instant.now(), "kcrp-" + topicPartition.topic() + "-" + topicPartition.partition(), Duration.ofMinutes(5L), Duration.ofSeconds(5L)));
    }
}
