package no.nav.common.kafka.consumer;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import no.nav.common.kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:no/nav/common/kafka/consumer/KafkaConsumerClientTest.class */
public class KafkaConsumerClientTest {
    private static final String TEST_TOPIC_1 = "test-topic-a";
    private static final String TEST_TOPIC_2 = "test-topic-b";
    private KafkaProducer<String, String> producer;
    private KafkaConsumer<String, String> commitChecker;

    @ClassRule
    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(TestUtils.KAFKA_IMAGE));

    @Before
    public void setup() {
        String bootstrapServers = kafka.getBootstrapServers();
        this.producer = new KafkaProducer<>(TestUtils.kafkaTestProducerProperties(bootstrapServers));
        this.commitChecker = new KafkaConsumer<>(TestUtils.kafkaTestConsumerProperties(bootstrapServers));
        AdminClient create = KafkaAdminClient.create(Map.of("bootstrap.servers", bootstrapServers));
        create.deleteTopics(List.of(TEST_TOPIC_1, TEST_TOPIC_2));
        create.createTopics(List.of(new NewTopic(TEST_TOPIC_1, 1, (short) 1), new NewTopic(TEST_TOPIC_2, 1, (short) 1)));
        create.close();
    }

    @After
    public void cleanup() {
        this.producer.close();
        this.commitChecker.close();
    }

    @Test
    public void should_consume_and_commit_offsets_and_start_consuming_again_on_excepted_offset() throws InterruptedException {
        KafkaConsumerClientConfig kafkaConsumerClientConfig = new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC_1, consumerWithCounter(new AtomicInteger(), 0L)));
        this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key1", "value1"));
        this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key1", "value2"));
        this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key2", "value3"));
        this.producer.flush();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(kafkaConsumerClientConfig);
        kafkaConsumerClient.start();
        Thread.sleep(1000L);
        kafkaConsumerClient.stop();
        Assert.assertEquals(3L, getCommittedOffsets(TEST_TOPIC_1, 0).offset());
        Assert.assertEquals(3L, r0.get());
        this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key1", "value4"));
        this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key2", "value5"));
        this.producer.flush();
        kafkaConsumerClient.start();
        Thread.sleep(1000L);
        kafkaConsumerClient.stop();
        Assert.assertEquals(5L, getCommittedOffsets(TEST_TOPIC_1, 0).offset());
        Assert.assertEquals(5L, r0.get());
    }

    @Test
    public void should_commit_consumed_tasks_when_closed_gracefully() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        for (int i = 0; i < 15; i++) {
            this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key1", "value" + i));
        }
        this.producer.flush();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC_1, consumerWithCounter(atomicInteger, 100L)), 10L));
        kafkaConsumerClient.start();
        Thread.sleep(1000L);
        kafkaConsumerClient.stop();
        OffsetAndMetadata committedOffsets = getCommittedOffsets(TEST_TOPIC_1, 0);
        Assert.assertTrue(committedOffsets.offset() > 4);
        Assert.assertTrue(committedOffsets.offset() < 12);
        Assert.assertEquals(atomicInteger.get(), committedOffsets.offset());
    }

    @Test
    public void should_consume_from_several_partitions() throws InterruptedException {
        AdminClient create = KafkaAdminClient.create(Map.of("bootstrap.servers", kafka.getBootstrapServers()));
        create.createTopics(List.of(new NewTopic("multi-partition-topic", 2, (short) 1)));
        create.close();
        KafkaConsumerClientConfig kafkaConsumerClientConfig = new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of("multi-partition-topic", consumerWithCounter(new AtomicInteger(), 0L)));
        this.producer.send(new ProducerRecord("multi-partition-topic", 0, "key1", "value1"));
        this.producer.send(new ProducerRecord("multi-partition-topic", 0, "key1", "value2"));
        this.producer.send(new ProducerRecord("multi-partition-topic", 0, "key1", "value3"));
        this.producer.send(new ProducerRecord("multi-partition-topic", 1, "key2", "value1"));
        this.producer.send(new ProducerRecord("multi-partition-topic", 1, "key2", "value2"));
        this.producer.send(new ProducerRecord("multi-partition-topic", 1, "key2", "value3"));
        this.producer.send(new ProducerRecord("multi-partition-topic", 1, "key2", "value4"));
        this.producer.flush();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(kafkaConsumerClientConfig);
        kafkaConsumerClient.start();
        Thread.sleep(1000L);
        kafkaConsumerClient.stop();
        OffsetAndMetadata committedOffsets = getCommittedOffsets("multi-partition-topic", 0);
        OffsetAndMetadata committedOffsets2 = getCommittedOffsets("multi-partition-topic", 1);
        Assert.assertEquals(7L, r0.get());
        Assert.assertEquals(3L, committedOffsets.offset());
        Assert.assertEquals(4L, committedOffsets2.offset());
    }

    @Test
    public void should_stop_consumption_of_topic_on_failure() throws InterruptedException {
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC_1, consumerWithCounter(new AtomicInteger(), 0L), TEST_TOPIC_2, failOnCountConsumer(new AtomicInteger(), 3))));
        kafkaConsumerClient.start();
        for (int i = 0; i < 5; i++) {
            this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key1", "value" + i));
            this.producer.send(new ProducerRecord(TEST_TOPIC_2, "key1", "value" + i));
        }
        this.producer.flush();
        Thread.sleep(1000L);
        kafkaConsumerClient.stop();
        OffsetAndMetadata committedOffsets = getCommittedOffsets(TEST_TOPIC_1, 0);
        OffsetAndMetadata committedOffsets2 = getCommittedOffsets(TEST_TOPIC_2, 0);
        Assert.assertEquals(5L, r0.get());
        Assert.assertEquals(5L, committedOffsets.offset());
        Assert.assertEquals(3L, r0.get());
        Assert.assertEquals(2L, committedOffsets2.offset());
    }

    @Test
    public void should_commit_on_both_clients() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        KafkaConsumerClientConfig kafkaConsumerClientConfig = new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC_1, consumerWithCounter(atomicInteger, 100L)));
        KafkaConsumerClientConfig kafkaConsumerClientConfig2 = new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC_1, consumerWithCounter(atomicInteger2, 100L)));
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(kafkaConsumerClientConfig);
        KafkaConsumerClient kafkaConsumerClient2 = new KafkaConsumerClient(kafkaConsumerClientConfig2);
        for (int i = 0; i < 100; i++) {
            this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key1", "value" + i));
        }
        this.producer.flush();
        kafkaConsumerClient.start();
        Thread.sleep(1000L);
        kafkaConsumerClient2.start();
        Thread.sleep(1000L);
        kafkaConsumerClient.stop();
        Thread.sleep(1000L);
        kafkaConsumerClient2.stop();
        Assert.assertTrue(getCommittedOffsets(TEST_TOPIC_1, 0).offset() > 10);
        Assert.assertTrue(atomicInteger.get() > 5);
        Assert.assertTrue(atomicInteger2.get() > 5);
    }

    @Test
    public void should_consume_on_1_thread_pr_partition() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        KafkaConsumerClientConfig kafkaConsumerClientConfig = new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC_1, consumerWithCounter(atomicInteger, 100L), TEST_TOPIC_2, consumerWithCounter(atomicInteger2, 100L)), 10L);
        for (int i = 0; i < 5; i++) {
            this.producer.send(new ProducerRecord(TEST_TOPIC_1, "key1", "value" + i));
            this.producer.send(new ProducerRecord(TEST_TOPIC_2, "key1", "value" + i));
        }
        this.producer.flush();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(kafkaConsumerClientConfig);
        kafkaConsumerClient.start();
        Thread.sleep(750L);
        Assert.assertTrue(8 < atomicInteger.get() + atomicInteger2.get());
        kafkaConsumerClient.stop();
    }

    private OffsetAndMetadata getCommittedOffsets(String str, int i) {
        TopicPartition topicPartition = new TopicPartition(str, i);
        return (OffsetAndMetadata) this.commitChecker.committed(Set.of(topicPartition), Duration.ofSeconds(1L)).get(topicPartition);
    }

    private TopicConsumer<String, String> consumerWithCounter(AtomicInteger atomicInteger, long j) {
        return consumerRecord -> {
            try {
                Thread.sleep(j);
                atomicInteger.incrementAndGet();
            } catch (Exception e) {
            }
            return ConsumeStatus.OK;
        };
    }

    private TopicConsumer<String, String> failOnCountConsumer(AtomicInteger atomicInteger, int i) {
        return consumerRecord -> {
            return atomicInteger.incrementAndGet() == i ? ConsumeStatus.FAILED : ConsumeStatus.OK;
        };
    }
}
