package no.nav.common.kafka.consumer.feilhandtering;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import net.javacrumbs.shedlock.core.LockProvider;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.feilhandtering.util.KafkaConsumerRecordProcessorBuilder;
import no.nav.common.kafka.utils.LocalH2Database;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:no/nav/common/kafka/consumer/feilhandtering/KafkaConsumerRecordProcessorIntegrationTest.class */
public class KafkaConsumerRecordProcessorIntegrationTest {
    private static final String TEST_TOPIC_A = "test-topic-a";
    private static final String TEST_TOPIC_B = "test-topic-b";
    private DataSource dataSource;
    private KafkaConsumerRepository consumerRepository;

    @Before
    public void setup() {
        this.dataSource = LocalH2Database.createDatabase(LocalH2Database.DatabaseType.POSTGRES);
        LocalH2Database.runScript(this.dataSource, "kafka-consumer-record-postgres.sql");
        this.consumerRepository = new PostgresConsumerRepository(this.dataSource);
    }

    @After
    public void cleanup() {
        LocalH2Database.cleanupConsumer(this.dataSource);
    }

    @Test
    public void should_consume_stored_records() throws InterruptedException {
        LockProvider lockProvider = lockConfiguration -> {
            return Optional.of(() -> {
            });
        };
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        KafkaConsumerRecordProcessor build = KafkaConsumerRecordProcessorBuilder.builder().withLockProvider(lockProvider).withKafkaConsumerRepository(this.consumerRepository).withRecordConsumers(Map.of(TEST_TOPIC_A, storedConsumerRecord -> {
            atomicInteger.incrementAndGet();
            return ConsumeStatus.OK;
        }, TEST_TOPIC_B, storedConsumerRecord2 -> {
            atomicInteger2.incrementAndGet();
            return ConsumeStatus.OK;
        })).build();
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_A, 1, 1L, "key1", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_A, 2, 1L, "key2", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_A, 1, 2L, "key1", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_B, 1, 1L, "key1", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_B, 1, 2L, "key2", "value"));
        build.start();
        Thread.sleep(1000L);
        build.close();
        Assert.assertEquals(3L, atomicInteger.get());
        Assert.assertEquals(2L, atomicInteger2.get());
        Assert.assertTrue(this.consumerRepository.getRecords(TEST_TOPIC_A, 1, 5).isEmpty());
        Assert.assertTrue(this.consumerRepository.getRecords(TEST_TOPIC_A, 2, 5).isEmpty());
        Assert.assertTrue(this.consumerRepository.getRecords(TEST_TOPIC_B, 1, 5).isEmpty());
    }

    @Test
    public void should_only_consume_from_topics_when_lock_is_acquired() throws InterruptedException {
        LockProvider lockProvider = lockConfiguration -> {
            return lockConfiguration.getName().contains(TEST_TOPIC_B) ? Optional.of(() -> {
            }) : Optional.empty();
        };
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        KafkaConsumerRecordProcessor build = KafkaConsumerRecordProcessorBuilder.builder().withLockProvider(lockProvider).withKafkaConsumerRepository(this.consumerRepository).withRecordConsumers(Map.of(TEST_TOPIC_A, storedConsumerRecord -> {
            atomicInteger.incrementAndGet();
            return ConsumeStatus.OK;
        }, TEST_TOPIC_B, storedConsumerRecord2 -> {
            atomicInteger2.incrementAndGet();
            return ConsumeStatus.OK;
        })).build();
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_A, 1, 1L, "key1", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_A, 2, 1L, "key2", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_A, 1, 2L, "key1", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_B, 1, 1L, "key1", "value"));
        this.consumerRepository.storeRecord(storedRecord(TEST_TOPIC_B, 1, 2L, "key2", "value"));
        build.start();
        Thread.sleep(1000L);
        build.close();
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertEquals(2L, atomicInteger2.get());
        Assert.assertEquals(2L, this.consumerRepository.getRecords(TEST_TOPIC_A, 1, 5).size());
        Assert.assertEquals(1L, this.consumerRepository.getRecords(TEST_TOPIC_A, 2, 5).size());
        Assert.assertTrue(this.consumerRepository.getRecords(TEST_TOPIC_B, 1, 5).isEmpty());
    }

    private StoredConsumerRecord storedRecord(String str, int i, long j, String str2, String str3) {
        return new StoredConsumerRecord(str, i, j, str2.getBytes(), str3.getBytes(), "[]", System.currentTimeMillis());
    }
}
