package org.springframework.cloud.stream.binder.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.mockito.BDDMockito;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.actuate.metrics.Metric;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.class */
public class KafkaBinderMetricsTest {
    private static final String TEST_TOPIC = "test";
    private KafkaBinderMetrics metrics;

    @Mock
    private DefaultKafkaConsumerFactory consumerFactory;

    @Mock
    private KafkaConsumer consumer;

    @Mock
    private KafkaMessageChannelBinder binder;
    private Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap();

    @Mock
    private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        BDDMockito.given(this.consumerFactory.createConsumer()).willReturn(this.consumer);
        BDDMockito.given(this.binder.getTopicsInUse()).willReturn(this.topicsInUse);
        this.metrics = new KafkaBinderMetrics(this.binder, this.kafkaBinderConfigurationProperties, this.consumerFactory);
        BDDMockito.given(this.consumer.endOffsets(Matchers.anyCollectionOf(TopicPartition.class))).willReturn(Collections.singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
    }

    @Test
    public void shouldIndicateLag() {
        BDDMockito.given(this.consumer.committed((TopicPartition) Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500L));
        List<PartitionInfo> partitions = partitions(new Node(0, (String) null, 0));
        this.topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
        BDDMockito.given(this.consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
        Collection metrics = this.metrics.metrics();
        Assertions.assertThat(metrics).hasSize(1);
        Assertions.assertThat(((Metric) metrics.iterator().next()).getName()).isEqualTo(String.format("%s.%s.%s.lag", "spring.cloud.stream.binder.kafka", "group", TEST_TOPIC));
        Assertions.assertThat(((Metric) metrics.iterator().next()).getValue()).isEqualTo(500L);
    }

    @Test
    public void shouldSumUpPartitionsLags() {
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
        hashMap.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
        BDDMockito.given(this.consumer.endOffsets(Matchers.anyCollectionOf(TopicPartition.class))).willReturn(hashMap);
        BDDMockito.given(this.consumer.committed((TopicPartition) Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500L));
        List<PartitionInfo> partitions = partitions(new Node(0, (String) null, 0), new Node(0, (String) null, 0));
        this.topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
        BDDMockito.given(this.consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
        Collection metrics = this.metrics.metrics();
        Assertions.assertThat(metrics).hasSize(1);
        Assertions.assertThat(((Metric) metrics.iterator().next()).getName()).isEqualTo(String.format("%s.%s.%s.lag", "spring.cloud.stream.binder.kafka", "group", TEST_TOPIC));
        Assertions.assertThat(((Metric) metrics.iterator().next()).getValue()).isEqualTo(1000L);
    }

    @Test
    public void shouldIndicateFullLagForNotCommittedGroups() {
        List<PartitionInfo> partitions = partitions(new Node(0, (String) null, 0));
        this.topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
        BDDMockito.given(this.consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
        Collection metrics = this.metrics.metrics();
        Assertions.assertThat(metrics).hasSize(1);
        Assertions.assertThat(((Metric) metrics.iterator().next()).getName()).isEqualTo(String.format("%s.%s.%s.lag", "spring.cloud.stream.binder.kafka", "group", TEST_TOPIC));
        Assertions.assertThat(((Metric) metrics.iterator().next()).getValue()).isEqualTo(1000L);
    }

    @Test
    public void shouldNotCalculateLagForProducerTopics() {
        this.topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation((String) null, partitions(new Node(0, (String) null, 0))));
        Assertions.assertThat(this.metrics.metrics()).isEmpty();
    }

    private List<PartitionInfo> partitions(Node... nodeArr) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < nodeArr.length; i++) {
            arrayList.add(new PartitionInfo(TEST_TOPIC, i, nodeArr[i], (Node[]) null, (Node[]) null));
        }
        return arrayList;
    }
}
