package no.nav.common.kafka.producer.util;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import no.nav.common.kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:no/nav/common/kafka/producer/util/KafkaProducerWithClientMetricsTest.class */
public class KafkaProducerWithClientMetricsTest {
    private static final String TEST_TOPIC_A = "test-topic-a";
    private static final String TEST_TOPIC_B = "test-topic-b";

    @ClassRule
    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(TestUtils.KAFKA_IMAGE));

    @Before
    public void setup() {
        AdminClient create = KafkaAdminClient.create(Map.of("bootstrap.servers", kafka.getBootstrapServers()));
        create.deleteTopics(List.of(TEST_TOPIC_A, TEST_TOPIC_B));
        create.createTopics(List.of(new NewTopic(TEST_TOPIC_A, 1, (short) 1), new NewTopic(TEST_TOPIC_B, 1, (short) 1)));
        create.close();
    }

    @Test
    public void should_register_status_counter_ok_metric() throws InterruptedException {
        SimpleMeterRegistry simpleMeterRegistry = new SimpleMeterRegistry();
        KafkaProducerClientWithMetrics kafkaProducerClientWithMetrics = new KafkaProducerClientWithMetrics(TestUtils.kafkaTestProducerProperties(kafka.getBootstrapServers()), simpleMeterRegistry);
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_B, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_B, "value"));
        Thread.sleep(1000L);
        Assert.assertEquals(3.0d, getStatusCount(simpleMeterRegistry, TEST_TOPIC_A, false), 0.0d);
        Assert.assertEquals(2.0d, getStatusCount(simpleMeterRegistry, TEST_TOPIC_B, false), 0.0d);
    }

    @Test
    public void should_register_status_counter_failed_metric() {
        SimpleMeterRegistry simpleMeterRegistry = new SimpleMeterRegistry();
        KafkaProducerClientWithMetrics kafkaProducerClientWithMetrics = new KafkaProducerClientWithMetrics(TestUtils.kafkaTestProducerProperties(kafka.getBootstrapServers()), simpleMeterRegistry);
        kafkaProducerClientWithMetrics.close();
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_B, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_B, "value"));
        Assert.assertEquals(3.0d, getStatusCount(simpleMeterRegistry, TEST_TOPIC_A, true), 0.0d);
        Assert.assertEquals(2.0d, getStatusCount(simpleMeterRegistry, TEST_TOPIC_B, true), 0.0d);
    }

    @Test
    public void should_register_current_offset_metric() {
        SimpleMeterRegistry simpleMeterRegistry = new SimpleMeterRegistry();
        KafkaProducerClientWithMetrics kafkaProducerClientWithMetrics = new KafkaProducerClientWithMetrics(TestUtils.kafkaTestProducerProperties(kafka.getBootstrapServers()), simpleMeterRegistry);
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_A, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_B, "value"));
        kafkaProducerClientWithMetrics.send(new ProducerRecord(TEST_TOPIC_B, "value"));
        kafkaProducerClientWithMetrics.close();
        ArrayList arrayList = new ArrayList(simpleMeterRegistry.get("kafka.producer.current-offset").gauges());
        Assert.assertEquals(2.0d, ((Gauge) arrayList.get(0)).value(), 0.0d);
        Assert.assertEquals(1.0d, ((Gauge) arrayList.get(1)).value(), 0.0d);
    }

    private double getStatusCount(MeterRegistry meterRegistry, String str, boolean z) {
        String[] strArr = new String[4];
        strArr[0] = "topic";
        strArr[1] = str;
        strArr[2] = "status";
        strArr[3] = z ? "failed" : "ok";
        return meterRegistry.counter("kafka.producer.status", strArr).count();
    }
}
