package no.nav.common.kafka.producer.feilhandtering;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.KafkaConsumerClient;
import no.nav.common.kafka.consumer.KafkaConsumerClientConfig;
import no.nav.common.kafka.producer.KafkaProducerClientImpl;
import no.nav.common.kafka.utils.LocalH2Database;
import no.nav.common.kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:no/nav/common/kafka/producer/feilhandtering/KafkaProducerRecordProcessorIntegrationTest.class */
public class KafkaProducerRecordProcessorIntegrationTest {
    private static final String TEST_TOPIC_A = "test-topic-a";
    private static final String TEST_TOPIC_B = "test-topic-b";

    @ClassRule
    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(TestUtils.KAFKA_IMAGE));
    private DataSource dataSource;
    private KafkaProducerRepository producerRepository;

    @Before
    public void setup() {
        String bootstrapServers = kafka.getBootstrapServers();
        this.dataSource = LocalH2Database.createDatabase(LocalH2Database.DatabaseType.POSTGRES);
        LocalH2Database.init(this.dataSource, "kafka-producer-record-postgres.sql");
        this.producerRepository = new PostgresProducerRepository(this.dataSource);
        AdminClient create = KafkaAdminClient.create(Map.of("bootstrap.servers", bootstrapServers));
        create.deleteTopics(List.of(TEST_TOPIC_A, TEST_TOPIC_B));
        create.createTopics(List.of(new NewTopic(TEST_TOPIC_A, 1, (short) 1), new NewTopic(TEST_TOPIC_B, 1, (short) 1)));
        create.close();
    }

    @After
    public void cleanup() {
        LocalH2Database.cleanupProducer(this.dataSource);
    }

    @Test
    public void should_send_stored_records_to_kafka() throws InterruptedException {
        this.producerRepository.storeRecord(storedRecord(TEST_TOPIC_A, "value1", "key1"));
        this.producerRepository.storeRecord(storedRecord(TEST_TOPIC_A, "value2", "key2"));
        this.producerRepository.storeRecord(storedRecord(TEST_TOPIC_A, "value3", "key1"));
        this.producerRepository.storeRecord(storedRecord(TEST_TOPIC_B, "value1", "key1"));
        this.producerRepository.storeRecord(storedRecord(TEST_TOPIC_B, "value2", "key2"));
        KafkaProducerRecordProcessor kafkaProducerRecordProcessor = new KafkaProducerRecordProcessor(this.producerRepository, new KafkaProducerClientImpl(TestUtils.kafkaTestByteProducerProperties(kafka.getBootstrapServers())), () -> {
            return true;
        });
        kafkaProducerRecordProcessor.start();
        Thread.sleep(1000L);
        kafkaProducerRecordProcessor.close();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC_A, consumerRecord -> {
            atomicInteger.incrementAndGet();
            return ConsumeStatus.OK;
        }, TEST_TOPIC_B, consumerRecord2 -> {
            atomicInteger2.incrementAndGet();
            return ConsumeStatus.OK;
        })));
        kafkaConsumerClient.start();
        Thread.sleep(1000L);
        kafkaConsumerClient.stop();
        Assert.assertEquals(3L, atomicInteger.get());
        Assert.assertEquals(2L, atomicInteger2.get());
        Assert.assertTrue(this.producerRepository.getRecords(10).isEmpty());
    }

    private StoredProducerRecord storedRecord(String str, String str2, String str3) {
        return new StoredProducerRecord(str, str2.getBytes(), str3.getBytes(), "[]");
    }
}
