package no.nav.common.kafka.consumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import no.nav.common.kafka.consumer.util.ConsumerUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:no/nav/common/kafka/consumer/KafkaConsumerClientImpl.class */
public class KafkaConsumerClientImpl<K, V> implements KafkaConsumerClient, ConsumerRebalanceListener {
    public static final long DEFAULT_POLL_DURATION_MS = 1000;
    private static final long POLL_ERROR_TIMEOUT_MS = 5000;
    private final KafkaConsumerClientConfig<K, V> config;
    private volatile CountDownLatch processedRecordsLatch;
    private volatile CountDownLatch shutdownLatch;
    private KafkaConsumer<K, V> consumer;
    private final Logger log = LoggerFactory.getLogger(KafkaConsumerClientImpl.class);
    private final ExecutorService pollExecutor = Executors.newSingleThreadExecutor();
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new ConcurrentHashMap();
    private final Set<TopicPartition> revokedPartitions = ConcurrentHashMap.newKeySet();
    private final Set<ConsumerRecord<K, V>> failedRecords = ConcurrentHashMap.newKeySet();
    private volatile ClientState clientState = ClientState.NOT_RUNNING;

    /* loaded from: input_file:no/nav/common/kafka/consumer/KafkaConsumerClientImpl$ClientState.class */
    private enum ClientState {
        RUNNING,
        NOT_RUNNING
    }

    public KafkaConsumerClientImpl(KafkaConsumerClientConfig<K, V> kafkaConsumerClientConfig) {
        validateConfig(kafkaConsumerClientConfig);
        this.config = kafkaConsumerClientConfig;
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    @Override // no.nav.common.kafka.consumer.KafkaConsumerClient
    public void start() {
        if (this.clientState == ClientState.RUNNING) {
            return;
        }
        this.clientState = ClientState.RUNNING;
        this.log.info("Starting kafka consumer client...");
        this.pollExecutor.submit(this::consumeTopics);
    }

    @Override // no.nav.common.kafka.consumer.KafkaConsumerClient
    public void stop() {
        if (this.clientState != ClientState.RUNNING) {
            return;
        }
        this.clientState = ClientState.NOT_RUNNING;
        this.log.info("Stopping kafka consumer client...");
        try {
            this.consumer.wakeup();
            this.shutdownLatch.await(10L, TimeUnit.SECONDS);
            this.log.info("Kafka consumer client stopped");
        } catch (InterruptedException e) {
            this.log.error("Failed to stop kafka consumer client gracefully", e);
        }
    }

    @Override // no.nav.common.kafka.consumer.KafkaConsumerClient
    public boolean isRunning() {
        return this.clientState == ClientState.RUNNING;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        this.log.info("Partitions has been revoked from consumer: " + Arrays.toString(collection.toArray()));
        this.revokedPartitions.addAll(collection);
        try {
            commitCurrentOffsets();
        } catch (Exception e) {
            this.log.error("Failed to commit offsets when partitions were revoked: " + this.offsetsToCommit, e);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.log.info("New partitions has been assigned to the consumer: " + Arrays.toString(collection.toArray()));
    }

    private void consumeTopics() {
        ArrayList arrayList = new ArrayList(this.config.topics.keySet());
        HashMap hashMap = new HashMap();
        try {
            try {
                this.shutdownLatch = new CountDownLatch(1);
                this.consumer = new KafkaConsumer<>(this.config.properties);
                this.consumer.subscribe(arrayList, this);
                while (this.clientState == ClientState.RUNNING) {
                    try {
                        this.revokedPartitions.clear();
                        this.offsetsToCommit.clear();
                        this.failedRecords.clear();
                        try {
                            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.config.pollDurationMs));
                            if (!poll.isEmpty()) {
                                this.processedRecordsLatch = new CountDownLatch(poll.count());
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                    String str = consumerRecord.topic();
                                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                                    TopicConsumer<K, V> topicConsumer = this.config.topics.get(str);
                                    ((ExecutorService) hashMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                                        return Executors.newSingleThreadExecutor();
                                    })).submit(() -> {
                                        try {
                                            try {
                                                if (this.clientState == ClientState.NOT_RUNNING || this.revokedPartitions.contains(topicPartition) || hasPreviouslyFailedRecord(topicPartition)) {
                                                    this.processedRecordsLatch.countDown();
                                                    return;
                                                }
                                                if (ConsumerUtils.safeConsume(topicConsumer, consumerRecord) == ConsumeStatus.OK) {
                                                    this.offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1));
                                                } else {
                                                    this.failedRecords.add(consumerRecord);
                                                }
                                                this.processedRecordsLatch.countDown();
                                            } catch (Exception e) {
                                                this.log.error(String.format("Unexpected error occurred while processing consumer record. topic=%s partition=%d offset=%d", str, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())), e);
                                                this.processedRecordsLatch.countDown();
                                            }
                                        } catch (Throwable th) {
                                            this.processedRecordsLatch.countDown();
                                            throw th;
                                        }
                                    });
                                }
                                this.processedRecordsLatch.await();
                                try {
                                    commitCurrentOffsets();
                                } catch (Exception e) {
                                    this.log.error("Failed to commit offsets: " + this.offsetsToCommit, e);
                                }
                                seekBackOnFailed();
                            }
                        } catch (Exception e2) {
                            this.log.error("Exception occurred during polling of records. Waiting before trying again", e2);
                            Thread.sleep(POLL_ERROR_TIMEOUT_MS);
                        } catch (WakeupException e3) {
                            this.log.info("Polling was cancelled by wakeup(). Stopping kafka consumer client...");
                            try {
                                try {
                                    hashMap.forEach((topicPartition3, executorService) -> {
                                        executorService.shutdown();
                                    });
                                    commitCurrentOffsets();
                                    this.consumer.close(Duration.ofSeconds(3L));
                                    this.shutdownLatch.countDown();
                                    if (this.clientState == ClientState.RUNNING) {
                                        this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                                        this.pollExecutor.submit(this::consumeTopics);
                                        return;
                                    }
                                    return;
                                } catch (Exception e4) {
                                    this.log.error("Failed to shutdown kafka consumer client properly", e4);
                                    this.shutdownLatch.countDown();
                                    if (this.clientState == ClientState.RUNNING) {
                                        this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                                        this.pollExecutor.submit(this::consumeTopics);
                                        return;
                                    }
                                    return;
                                }
                            } finally {
                            }
                        }
                    } catch (Throwable th) {
                        this.shutdownLatch.countDown();
                        if (this.clientState == ClientState.RUNNING) {
                            this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                            this.pollExecutor.submit(this::consumeTopics);
                        }
                        throw th;
                    }
                }
                try {
                    hashMap.forEach((topicPartition32, executorService2) -> {
                        executorService2.shutdown();
                    });
                    commitCurrentOffsets();
                    this.consumer.close(Duration.ofSeconds(3L));
                    this.shutdownLatch.countDown();
                    if (this.clientState == ClientState.RUNNING) {
                        this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                        this.pollExecutor.submit(this::consumeTopics);
                    }
                } catch (Exception e5) {
                    this.log.error("Failed to shutdown kafka consumer client properly", e5);
                }
            } catch (Exception e6) {
                this.log.error("Unexpected exception caught from main loop. Shutting down...", e6);
                try {
                    try {
                        hashMap.forEach((topicPartition322, executorService22) -> {
                            executorService22.shutdown();
                        });
                        commitCurrentOffsets();
                        this.consumer.close(Duration.ofSeconds(3L));
                        this.shutdownLatch.countDown();
                        if (this.clientState == ClientState.RUNNING) {
                            this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                            this.pollExecutor.submit(this::consumeTopics);
                        }
                    } catch (Exception e7) {
                        this.log.error("Failed to shutdown kafka consumer client properly", e7);
                        this.shutdownLatch.countDown();
                        if (this.clientState == ClientState.RUNNING) {
                            this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                            this.pollExecutor.submit(this::consumeTopics);
                        }
                    }
                } finally {
                    this.shutdownLatch.countDown();
                    if (this.clientState == ClientState.RUNNING) {
                        this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                        this.pollExecutor.submit(this::consumeTopics);
                    }
                }
            }
        } catch (Throwable th2) {
            try {
                try {
                    hashMap.forEach((topicPartition3222, executorService222) -> {
                        executorService222.shutdown();
                    });
                    commitCurrentOffsets();
                    this.consumer.close(Duration.ofSeconds(3L));
                    this.shutdownLatch.countDown();
                    if (this.clientState == ClientState.RUNNING) {
                        this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                        this.pollExecutor.submit(this::consumeTopics);
                    }
                } catch (Exception e8) {
                    this.log.error("Failed to shutdown kafka consumer client properly", e8);
                    this.shutdownLatch.countDown();
                    if (this.clientState == ClientState.RUNNING) {
                        this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                        this.pollExecutor.submit(this::consumeTopics);
                    }
                }
                throw th2;
            } catch (Throwable th3) {
                this.shutdownLatch.countDown();
                if (this.clientState == ClientState.RUNNING) {
                    this.log.warn("Unexpected failure while kafka consumer client was running. Restarting...");
                    this.pollExecutor.submit(this::consumeTopics);
                }
                throw th3;
            }
        }
    }

    private boolean hasPreviouslyFailedRecord(TopicPartition topicPartition) {
        return this.failedRecords.stream().anyMatch(consumerRecord -> {
            return consumerRecord.topic().equals(topicPartition.topic()) && consumerRecord.partition() == topicPartition.partition();
        });
    }

    private void commitCurrentOffsets() {
        if (this.offsetsToCommit.isEmpty()) {
            return;
        }
        this.consumer.commitSync(this.offsetsToCommit, Duration.ofSeconds(3L));
        this.log.info("Offsets committed: " + this.offsetsToCommit);
        this.offsetsToCommit.clear();
    }

    private void seekBackOnFailed() {
        this.failedRecords.forEach(consumerRecord -> {
            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            Logger logger = this.log;
            logger.warn("Seeking back to offset " + consumerRecord.offset() + " for: " + logger);
            this.consumer.seek(topicPartition, consumerRecord.offset());
        });
        this.failedRecords.clear();
    }

    private static void validateConfig(KafkaConsumerClientConfig<?, ?> kafkaConsumerClientConfig) {
        if (kafkaConsumerClientConfig.topics.isEmpty()) {
            throw new IllegalArgumentException("\"topics\" must contain at least 1 topic");
        }
        if (kafkaConsumerClientConfig.pollDurationMs <= 0) {
            throw new IllegalArgumentException("\"pollDurationMs\" must be larger than 0");
        }
        if (!Boolean.FALSE.equals(kafkaConsumerClientConfig.properties.get("enable.auto.commit"))) {
            throw new IllegalArgumentException("enable.auto.commit must be false!");
        }
    }
}
