package no.nav.common.kafka.producer.feilhandtering;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import no.nav.common.job.leader_election.LeaderElectionClient;
import no.nav.common.kafka.producer.KafkaProducerClient;
import no.nav.common.kafka.producer.util.ProducerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:no/nav/common/kafka/producer/feilhandtering/KafkaProducerRecordProcessor.class */
public class KafkaProducerRecordProcessor {
    private static final long ERROR_TIMEOUT_MS = 5000;
    private static final long POLL_TIMEOUT_MS = 3000;
    private static final long WAITING_FOR_LEADER_TIMEOUT_MS = 10000;
    private static final int RECORDS_BATCH_SIZE = 100;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final KafkaProducerRepository producerRepository;
    private final KafkaProducerClient<byte[], byte[]> producerClient;
    private final LeaderElectionClient leaderElectionClient;
    private volatile boolean isRunning;
    private volatile boolean isClosed;

    public KafkaProducerRecordProcessor(KafkaProducerRepository kafkaProducerRepository, KafkaProducerClient<byte[], byte[]> kafkaProducerClient, LeaderElectionClient leaderElectionClient) {
        this.producerRepository = kafkaProducerRepository;
        this.producerClient = kafkaProducerClient;
        this.leaderElectionClient = leaderElectionClient;
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public void start() {
        if (this.isClosed) {
            throw new IllegalStateException("Cannot start closed producer record processor");
        }
        if (this.isRunning) {
            return;
        }
        this.executorService.submit(this::recordHandlerLoop);
    }

    public void close() {
        this.isRunning = false;
        this.isClosed = true;
    }

    private void recordHandlerLoop() {
        this.isRunning = true;
        while (this.isRunning) {
            try {
                try {
                    if (this.leaderElectionClient.isLeader()) {
                        List<StoredProducerRecord> records = this.producerRepository.getRecords(RECORDS_BATCH_SIZE);
                        if (!records.isEmpty()) {
                            publishStoredRecordsBatch(records);
                        }
                        if (records.size() < RECORDS_BATCH_SIZE) {
                            Thread.sleep(POLL_TIMEOUT_MS);
                        }
                    } else {
                        Thread.sleep(WAITING_FOR_LEADER_TIMEOUT_MS);
                    }
                } catch (Exception e) {
                    this.log.error("Failed to process kafka producer records", e);
                    Thread.sleep(ERROR_TIMEOUT_MS);
                }
            } catch (Exception e2) {
                this.log.error("Unexpected exception caught in record handler loop", e2);
                return;
            } finally {
                this.log.info("Closing kafka producer record processor...");
                this.producerClient.close();
            }
        }
    }

    private void publishStoredRecordsBatch(List<StoredProducerRecord> list) throws InterruptedException {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        list.forEach(storedProducerRecord -> {
            this.producerClient.send(ProducerUtils.mapFromStoredRecord(storedProducerRecord), (recordMetadata, exc) -> {
                countDownLatch.countDown();
                if (exc != null) {
                    this.log.warn(String.format("Failed to resend failed message to topic %s", storedProducerRecord.getTopic()), exc);
                } else {
                    concurrentLinkedQueue.add(Long.valueOf(storedProducerRecord.getId()));
                }
            });
        });
        this.producerClient.getProducer().flush();
        countDownLatch.await();
        this.producerRepository.deleteRecords(new ArrayList(concurrentLinkedQueue));
    }
}
