package no.nav.common.kafka.producer;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.KafkaConsumerClient;
import no.nav.common.kafka.consumer.KafkaConsumerClientConfig;
import no.nav.common.kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:no/nav/common/kafka/producer/KafkaProducerClientImplIntegrationTest.class */
public class KafkaProducerClientImplIntegrationTest {
    private static final String TEST_TOPIC = "test-topic";
    private KafkaProducerClientImpl<String, String> producerClient;

    @ClassRule
    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(TestUtils.KAFKA_IMAGE));

    @Before
    public void setup() {
        kafka.start();
        String bootstrapServers = kafka.getBootstrapServers();
        this.producerClient = new KafkaProducerClientImpl<>(TestUtils.kafkaTestProducerProperties(bootstrapServers));
        AdminClient create = KafkaAdminClient.create(Map.of("bootstrap.servers", bootstrapServers));
        create.deleteTopics(List.of(TEST_TOPIC));
        create.createTopics(List.of(new NewTopic(TEST_TOPIC, 1, (short) 1)));
        create.close();
    }

    @After
    public void cleanup() {
        this.producerClient.close();
        kafka.stop();
    }

    @Test
    public void should_produce_record() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC, consumerRecord -> {
            atomicReference.set(consumerRecord);
            return ConsumeStatus.OK;
        })));
        this.producerClient.send(new ProducerRecord(TEST_TOPIC, "key", "value"));
        kafkaConsumerClient.start();
        Thread.sleep(500L);
        kafkaConsumerClient.stop();
        ConsumerRecord consumerRecord2 = (ConsumerRecord) atomicReference.get();
        Assert.assertEquals(TEST_TOPIC, consumerRecord2.topic());
        Assert.assertEquals("key", consumerRecord2.key());
        Assert.assertEquals("value", consumerRecord2.value());
    }

    @Test
    public void should_produce_multiple_records() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC, consumerRecord -> {
            atomicInteger.incrementAndGet();
            return ConsumeStatus.OK;
        })));
        this.producerClient.send(new ProducerRecord(TEST_TOPIC, "key", "value"));
        this.producerClient.send(new ProducerRecord(TEST_TOPIC, "key", "value"));
        this.producerClient.send(new ProducerRecord(TEST_TOPIC, "key", "value"));
        kafkaConsumerClient.start();
        Thread.sleep(500L);
        kafkaConsumerClient.stop();
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test
    public void should_produce_record_sync() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(new KafkaConsumerClientConfig(TestUtils.kafkaTestConsumerProperties(kafka.getBootstrapServers()), Map.of(TEST_TOPIC, consumerRecord -> {
            atomicInteger.incrementAndGet();
            return ConsumeStatus.OK;
        })));
        kafkaConsumerClient.start();
        this.producerClient.sendSync(new ProducerRecord(TEST_TOPIC, "key", "value"));
        Thread.sleep(500L);
        kafkaConsumerClient.stop();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void should_throw_exception_when_fail_to_send_sync() {
        kafka.stop();
        Assert.assertThrows(ExecutionException.class, () -> {
            this.producerClient.sendSync(new ProducerRecord(TEST_TOPIC, "key", "value"));
        });
    }

    @Test
    public void should_execute_callback_with_exception_when_fail_to_send() throws InterruptedException {
        kafka.stop();
        AtomicReference atomicReference = new AtomicReference();
        this.producerClient.send(new ProducerRecord(TEST_TOPIC, "key", "value"), (recordMetadata, exc) -> {
            atomicReference.set(exc);
        });
        Thread.sleep(2500L);
        Assert.assertEquals(TimeoutException.class, ((Exception) atomicReference.get()).getClass());
    }

    @Test
    public void should_execute_callback_with_exception_when_sending_after_closed() {
        AtomicInteger atomicInteger = new AtomicInteger();
        Callback callback = (recordMetadata, exc) -> {
            if (exc != null) {
                atomicInteger.incrementAndGet();
            }
        };
        this.producerClient.close();
        this.producerClient.send(new ProducerRecord(TEST_TOPIC, "key", "value"), callback);
        this.producerClient.send(new ProducerRecord(TEST_TOPIC, "key", "value"), callback);
        Assert.assertEquals(2L, atomicInteger.get());
    }

    @Test
    public void should_throw_exception_when_sending_after_closed() {
        this.producerClient.close();
        Assert.assertThrows(ExecutionException.class, () -> {
            this.producerClient.sendSync(new ProducerRecord(TEST_TOPIC, "key", "value"));
        });
    }
}
