package io.confluent.connect.replicator.metrics;

import io.confluent.connect.replicator.ReplicatorSourceTaskConfig;
import io.confluent.connect.replicator.exec.ExecutableConfigProviderTest;
import io.confluent.connect.replicator.metrics.ConfluentReplicatorMetrics;
import io.confluent.connect.replicator.util.MockTime;
import io.confluent.connect.replicator.util.Utils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/replicator/metrics/ConfluentReplicatorTaskMetricsGroupTest.class */
public class ConfluentReplicatorTaskMetricsGroupTest {
    private ConcurrentMap<TopicPartition, Long> endOffsetMap;
    private Map<SourceRecord, RecordMetadata> records;
    private long initialEndOffset;
    private long initialConnectOffset;
    private long initialTimestamp;
    private String sourceClusterId;
    private String destClusterId;
    private String topic;
    private String destTopic;
    private String taskId;
    private String replicatorName;
    private MockConsumer<byte[], byte[]> endOffsetConsumer;
    private ConfluentReplicatorTaskMetricsGroup metricGroup;
    private FetchEndOffsetService fetchEndOffsets;
    private Map<TopicPartition, ConfluentReplicatorMetrics.ReplicatorMetricGroup> topicPartitionReplicatorGroupMap;

    @Before
    public void setup() {
        this.taskId = "replicator-0";
        HashMap hashMap = new HashMap();
        hashMap.put(ExecutableConfigProviderTest.SRC_KAFKA_BOOTSTRAP_SERVERS, "foo:9092");
        hashMap.put(ExecutableConfigProviderTest.DEST_KAFKA_BOOTSTRAP_SERVERS, "bar:9092");
        hashMap.put("topic.rename.format", "${topic}.replica");
        hashMap.put("task.id", this.taskId);
        hashMap.put("partition.assignment", null);
        ReplicatorSourceTaskConfig replicatorSourceTaskConfig = new ReplicatorSourceTaskConfig(hashMap);
        this.topic = "test-replicator-task-metrics-group";
        this.destTopic = "test-replicator-task-metrics-group.replica";
        this.sourceClusterId = "source-cluster";
        this.destClusterId = "dest-cluster";
        this.replicatorName = "my-replicator";
        this.initialEndOffset = 100L;
        this.initialConnectOffset = 50L;
        this.initialTimestamp = 100L;
        MockTime mockTime = new MockTime();
        this.endOffsetMap = new ConcurrentHashMap();
        this.records = new HashMap();
        this.endOffsetConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        initializeSourceAssignmentAndEndOffsets();
        this.fetchEndOffsets = new FetchEndOffsetService(this.endOffsetConsumer, this.endOffsetMap.keySet(), this.taskId, (ScheduledExecutorService) EasyMock.createMock(ScheduledExecutorService.class));
        this.fetchEndOffsets.updateEndOffsets();
        this.metricGroup = new ConfluentReplicatorTaskMetricsGroup(replicatorSourceTaskConfig, this.taskId, this.endOffsetMap.keySet(), new ConfluentReplicatorMetrics(this.taskId, mockTime), this.sourceClusterId, this.destClusterId, this.endOffsetConsumer, this.fetchEndOffsets, this.replicatorName);
        this.metricGroup.setupMetrics();
        this.topicPartitionReplicatorGroupMap = this.metricGroup.getTopicPartitionReplicatorGroupMap();
    }

    private void initializeSourceAssignmentAndEndOffsets() {
        for (int i = 0; i < 500; i++) {
            TopicPartition topicPartition = new TopicPartition(this.topic, i);
            this.records.put(new SourceRecord(Utils.toConnectPartition(this.topic, i), Utils.toConnectOffset(this.initialConnectOffset), this.topic, Integer.valueOf(i), (Schema) null, (Object) null, (Schema) null, (Object) null, Long.valueOf(this.initialTimestamp), (Iterable) null), new RecordMetadata(topicPartition, 0L, this.initialConnectOffset, this.initialTimestamp, 0L, 0, 0));
            this.endOffsetMap.put(topicPartition, Long.valueOf(this.initialEndOffset));
        }
        this.endOffsetConsumer.updateEndOffsets(this.endOffsetMap);
    }

    @Test
    public void testMetricsRecordCorrectlyWithInvalidDataPoints() {
        initializeSourceAssignmentAndEndOffsetsWithInvalidValues();
        for (SourceRecord sourceRecord : this.records.keySet()) {
            ConfluentReplicatorMetrics.ReplicatorMetricGroup replicatorMetricGroup = this.topicPartitionReplicatorGroupMap.get(constructCurrentTopicPartition(sourceRecord));
            MetricName metricName = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.destTopic);
            MetricName metricName2 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.sourceClusterIdTemplate);
            MetricName metricName3 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.destClusterIdTemplate);
            MetricName metricName4 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskMessageLagTemplate);
            MetricName metricName5 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskLatencyTemplate);
            MetricName metricName6 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskByteThroughputTemplate);
            this.metricGroup.recordTaskTopicPartitionMetrics(sourceRecord, (RecordMetadata) null);
            String str = (String) replicatorMetricGroup.metrics().metric(metricName).metricValue();
            String str2 = (String) replicatorMetricGroup.metrics().metric(metricName2).metricValue();
            String str3 = (String) replicatorMetricGroup.metrics().metric(metricName3).metricValue();
            Double valueOf = Double.valueOf(((Double) replicatorMetricGroup.metrics().metric(metricName4).metricValue()).doubleValue());
            Double valueOf2 = Double.valueOf(((Double) replicatorMetricGroup.metrics().metric(metricName5).metricValue()).doubleValue());
            double doubleValue = ((Double) replicatorMetricGroup.metrics().metric(metricName6).metricValue()).doubleValue();
            Assert.assertEquals(this.destTopic, str);
            Assert.assertEquals(this.sourceClusterId, str2);
            Assert.assertEquals(this.destClusterId, str3);
            Assert.assertEquals("NaN", valueOf.toString());
            Assert.assertEquals("NaN", valueOf2.toString());
            Assert.assertEquals(0.0d, doubleValue, 0.0d);
        }
    }

    private void initializeSourceAssignmentAndEndOffsetsWithInvalidValues() {
        this.endOffsetMap.replaceAll((topicPartition, l) -> {
            return -1L;
        });
        this.endOffsetConsumer.updateEndOffsets(this.endOffsetMap);
        this.fetchEndOffsets.updateEndOffsets();
        this.records.clear();
        for (int i = 0; i < 500; i++) {
            this.records.put(new SourceRecord(Utils.toConnectPartition(this.topic, i), (Map) null, this.topic, Integer.valueOf(i), (Schema) null, (Object) null, (Schema) null, (Object) null, (Long) null, (Iterable) null), null);
        }
    }

    @Test
    public void testMetricsSetupCorrectly() {
        for (TopicPartition topicPartition : this.topicPartitionReplicatorGroupMap.keySet()) {
            ConfluentReplicatorMetrics.ReplicatorMetricGroup replicatorMetricGroup = this.topicPartitionReplicatorGroupMap.get(topicPartition);
            Map tags = replicatorMetricGroup.groupId.tags();
            Assert.assertEquals(ConfluentReplicatorMetricsRegistry.CONFLUENT_REPLICATOR_TASK_TAGS.size(), tags.size());
            Assert.assertEquals(ConfluentReplicatorMetricsRegistry.CONFLUENT_REPLICATOR_TASK_TAGS, tags.keySet());
            Assert.assertTrue(tags.containsValue(this.replicatorName));
            Assert.assertTrue(tags.containsValue(topicPartition.topic()));
            Assert.assertTrue(tags.containsValue(topicPartition.toString()));
            Assert.assertTrue(tags.containsValue(this.taskId));
            String str = replicatorMetricGroup.sensorPrefix + "confluent-replicator-task-topic-partition-throughput";
            String str2 = replicatorMetricGroup.sensorPrefix + "confluent-replicator-task-topic-partition-byte-throughput";
            String str3 = replicatorMetricGroup.sensorPrefix + "confluent-replicator-task-topic-partition-latency";
            String str4 = replicatorMetricGroup.sensorPrefix + "confluent-replicator-task-topic-partition-message-lag";
            Set sensorNames = replicatorMetricGroup.getSensorNames();
            Assert.assertEquals(sensorNames.size(), 4);
            Assert.assertTrue(sensorNames.contains(str));
            Assert.assertTrue(sensorNames.contains(str2));
            Assert.assertTrue(sensorNames.contains(str3));
            Assert.assertTrue(sensorNames.contains(str4));
        }
    }

    private TopicPartition constructCurrentTopicPartition(SourceRecord sourceRecord) {
        TopicPartition topicPartition = null;
        if (sourceRecord.sourcePartition() != null) {
            String str = (String) sourceRecord.sourcePartition().get("topic");
            Object obj = sourceRecord.sourcePartition().get("partition");
            if (str != null && obj != null) {
                topicPartition = new TopicPartition(str, ((Integer) obj).intValue());
            }
        }
        return topicPartition;
    }

    @Test
    public void testMetricsRecordCorrectly() {
        for (SourceRecord sourceRecord : this.records.keySet()) {
            ConfluentReplicatorMetrics.ReplicatorMetricGroup replicatorMetricGroup = this.topicPartitionReplicatorGroupMap.get(constructCurrentTopicPartition(sourceRecord));
            MetricName metricName = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.destTopic);
            MetricName metricName2 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.sourceClusterIdTemplate);
            MetricName metricName3 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.destClusterIdTemplate);
            MetricName metricName4 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskMessageLagTemplate);
            this.metricGroup.recordTaskTopicPartitionMetrics(sourceRecord, this.records.get(sourceRecord));
            String str = (String) replicatorMetricGroup.metrics().metric(metricName).metricValue();
            String str2 = (String) replicatorMetricGroup.metrics().metric(metricName2).metricValue();
            String str3 = (String) replicatorMetricGroup.metrics().metric(metricName3).metricValue();
            double doubleValue = ((Double) replicatorMetricGroup.metrics().metric(metricName4).metricValue()).doubleValue();
            Assert.assertEquals(this.destTopic, str);
            Assert.assertEquals(this.sourceClusterId, str2);
            Assert.assertEquals(this.destClusterId, str3);
            Assert.assertEquals(this.initialEndOffset - this.initialConnectOffset, doubleValue, 0.0d);
        }
    }

    private long getCurrentOffset(SourceRecord sourceRecord) {
        long j = 0;
        if (sourceRecord.sourceOffset() != null) {
            j = ((Long) sourceRecord.sourceOffset().get("offset")).longValue();
        }
        return j;
    }

    private void updateEndOffsets() {
        Iterator<TopicPartition> it = this.endOffsetMap.keySet().iterator();
        while (it.hasNext()) {
            this.endOffsetMap.put(it.next(), Long.valueOf(new Random().nextInt() + this.initialEndOffset));
        }
        this.endOffsetConsumer.updateEndOffsets(this.endOffsetMap);
        this.fetchEndOffsets.updateEndOffsets();
    }

    private void updateSourceRecords() {
        int size = this.records.size();
        this.records.clear();
        for (int i = 0; i < size; i++) {
            Random random = new Random();
            long nextInt = random.nextInt() + this.initialConnectOffset;
            long nextInt2 = random.nextInt() + this.initialTimestamp;
            this.records.put(new SourceRecord(Utils.toConnectPartition(this.topic, i), Utils.toConnectOffset(nextInt), this.topic, Integer.valueOf(i), (Schema) null, (Object) null, (Schema) null, (Object) null, Long.valueOf(nextInt2), (Iterable) null), new RecordMetadata(new TopicPartition(this.topic, i), 0L, nextInt, nextInt2, 0L, 0, 0));
        }
    }

    @Test
    public void testMetricsRecordCorrectlyWithUpdatedData() {
        updateEndOffsets();
        updateSourceRecords();
        for (SourceRecord sourceRecord : this.records.keySet()) {
            TopicPartition constructCurrentTopicPartition = constructCurrentTopicPartition(sourceRecord);
            ConfluentReplicatorMetrics.ReplicatorMetricGroup replicatorMetricGroup = this.topicPartitionReplicatorGroupMap.get(constructCurrentTopicPartition);
            MetricName metricName = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.destTopic);
            MetricName metricName2 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.sourceClusterIdTemplate);
            MetricName metricName3 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.destClusterIdTemplate);
            MetricName metricName4 = replicatorMetricGroup.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskMessageLagTemplate);
            this.metricGroup.recordTaskTopicPartitionMetrics(sourceRecord, this.records.get(sourceRecord));
            String str = (String) replicatorMetricGroup.metrics().metric(metricName).metricValue();
            String str2 = (String) replicatorMetricGroup.metrics().metric(metricName2).metricValue();
            String str3 = (String) replicatorMetricGroup.metrics().metric(metricName3).metricValue();
            double doubleValue = ((Double) replicatorMetricGroup.metrics().metric(metricName4).metricValue()).doubleValue();
            Assert.assertEquals(this.destTopic, str);
            Assert.assertEquals(this.sourceClusterId, str2);
            Assert.assertEquals(this.destClusterId, str3);
            Assert.assertEquals(this.fetchEndOffsets.getEndOffset(constructCurrentTopicPartition) < getCurrentOffset(sourceRecord) ? 0L : r0 - r0, doubleValue, 0.0d);
        }
    }
}
