package io.confluent.connect.replicator.metrics;

import io.confluent.connect.replicator.ReplicatorSourceTaskConfig;
import io.confluent.connect.replicator.metrics.ConfluentReplicatorMetrics;
import io.confluent.connect.replicator.util.Utils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/metrics/ConfluentReplicatorTaskMetricsGroup.class */
public class ConfluentReplicatorTaskMetricsGroup {
    private static final Logger log = LoggerFactory.getLogger(ConfluentReplicatorTaskMetricsGroup.class);
    private ReplicatorSourceTaskConfig config;
    private String taskId;
    private Collection<TopicPartition> sourceAssignment;
    private ConfluentReplicatorMetrics replicatorMetrics;
    private Consumer<byte[], byte[]> endOffsetConsumer;
    private Map<TopicPartition, ConfluentReplicatorMetrics.ReplicatorMetricGroup> topicPartitionReplicatorGroupMap;
    private String sourceClusterId;
    private String destClusterId;
    private String replicatorName;
    private FetchEndOffsetService endOffsetService;

    public ConfluentReplicatorTaskMetricsGroup(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, String str, Collection<TopicPartition> collection, ConfluentReplicatorMetrics confluentReplicatorMetrics, String str2, String str3, String str4) {
        this(replicatorSourceTaskConfig, str, collection, confluentReplicatorMetrics, str2, str3, null, null, str4);
    }

    public ConfluentReplicatorTaskMetricsGroup(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, String str, Collection<TopicPartition> collection, ConfluentReplicatorMetrics confluentReplicatorMetrics, String str2, String str3, Consumer<byte[], byte[]> consumer, FetchEndOffsetService fetchEndOffsetService, String str4) {
        this.config = replicatorSourceTaskConfig;
        this.taskId = str;
        this.sourceAssignment = collection;
        this.replicatorMetrics = confluentReplicatorMetrics;
        this.sourceClusterId = str2;
        this.destClusterId = str3;
        this.topicPartitionReplicatorGroupMap = new HashMap(this.sourceAssignment.size());
        this.endOffsetConsumer = consumer;
        this.endOffsetService = fetchEndOffsetService;
        this.replicatorName = str4;
    }

    public void setupMetrics() {
        log.debug("Setting up recording groups for each TopicPartition for this task...");
        setupMetricGroups();
        if (this.endOffsetConsumer == null) {
            log.debug("Building endOffsetConsumer...");
            this.endOffsetConsumer = buildEndOffsetConsumer(this.config);
        }
        if (this.endOffsetService == null) {
            log.debug("Creating EndOffsetService...");
            this.endOffsetService = new FetchEndOffsetService(this.endOffsetConsumer, this.sourceAssignment, this.taskId, 10000L);
        }
    }

    private void setupMetricGroups() {
        for (TopicPartition topicPartition : this.sourceAssignment) {
            ConfluentReplicatorMetrics.ReplicatorMetricGroup group = this.replicatorMetrics.group("confluent-replicator-task-metrics", "confluent-replicator-task", this.taskId, "confluent-replicator-task-topic-partition", topicPartition.toString(), "confluent-replicator-name", this.replicatorName, "confluent-replicator-topic-name", topicPartition.topic());
            group.close();
            group.addImmutableValueMetric(ConfluentReplicatorMetricsRegistry.sourceClusterIdTemplate, this.sourceClusterId);
            group.addImmutableValueMetric(ConfluentReplicatorMetricsRegistry.destClusterIdTemplate, this.destClusterId);
            group.addImmutableValueMetric(ConfluentReplicatorMetricsRegistry.destTopic, toDestTopic(topicPartition.topic()));
            group.sensor(ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_MESSAGE_LAG).add(group.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskMessageLagTemplate), new MetricLagAvg());
            group.sensor(ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_LATENCY).add(group.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskLatencyTemplate), new Avg());
            group.sensor(ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_THROUGHPUT).add(group.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskThroughputTemplate), new Rate());
            group.sensor(ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_BYTE_THROUGHPUT).add(group.metricName(ConfluentReplicatorMetricsRegistry.replicatorTaskByteThroughputTemplate), new Rate());
            log.debug("Initializing MetricGroup for TopicPartition {}", topicPartition);
            this.topicPartitionReplicatorGroupMap.put(topicPartition, group);
        }
    }

    private String toDestTopic(String str) {
        return Utils.renameTopic(this.config.getTopicRenameFormat(), str);
    }

    private Consumer<byte[], byte[]> buildEndOffsetConsumer(ReplicatorSourceTaskConfig replicatorSourceTaskConfig) {
        HashMap hashMap = new HashMap(replicatorSourceTaskConfig.getSourceConsumerConfigs());
        hashMap.put("group.id", "confluent-replicator-end-offsets-consumer-group");
        hashMap.put("client.id", "confluent-replicator-end-offsets-consumer-client");
        hashMap.put("enable.auto.commit", false);
        hashMap.put("auto.offset.reset", "none");
        return new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    private TopicPartition constructCurrentTopicPartition(SourceRecord sourceRecord) {
        TopicPartition topicPartition = null;
        if (sourceRecord.sourcePartition() != null) {
            String str = (String) sourceRecord.sourcePartition().get("topic");
            Object obj = sourceRecord.sourcePartition().get("partition");
            if (str != null && obj != null) {
                topicPartition = new TopicPartition(str, ((Integer) obj).intValue());
            }
        }
        return topicPartition;
    }

    private long getCurrentOffset(SourceRecord sourceRecord) {
        return ((Long) sourceRecord.sourceOffset().get("offset")).longValue();
    }

    private long calculateLag(SourceRecord sourceRecord, TopicPartition topicPartition) {
        log.trace("Calculating lag for current record...");
        long currentOffset = getCurrentOffset(sourceRecord);
        long endOffset = this.endOffsetService.getEndOffset(topicPartition);
        if (endOffset > currentOffset) {
            return endOffset - currentOffset;
        }
        return 0L;
    }

    private long calculateLatency(SourceRecord sourceRecord) {
        log.trace("Calculating latency for current record...");
        long currentTimeMillis = System.currentTimeMillis();
        long longValue = sourceRecord.timestamp().longValue();
        if (currentTimeMillis > longValue) {
            return currentTimeMillis - longValue;
        }
        return 0L;
    }

    private double calculateByteBasedThroughput(RecordMetadata recordMetadata) {
        return Math.max(recordMetadata.serializedKeySize(), 0) + Math.max(recordMetadata.serializedValueSize(), 0);
    }

    public void recordTaskTopicPartitionMetrics(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        TopicPartition constructCurrentTopicPartition = constructCurrentTopicPartition(sourceRecord);
        if (constructCurrentTopicPartition == null) {
            log.warn("Couldn't record metrics for record {} for task {}. Can't find TopicPartition associated with this SourceRecord", sourceRecord.toString(), this.taskId);
            return;
        }
        ConfluentReplicatorMetrics.ReplicatorMetricGroup replicatorMetricGroup = this.topicPartitionReplicatorGroupMap.get(constructCurrentTopicPartition);
        if (replicatorMetricGroup == null) {
            log.warn("Couldn't record metrics for topic partition {} for task {}. Can't find MetricGroup associated with this TopicPartition.", constructCurrentTopicPartition, this.taskId);
            return;
        }
        log.trace("Recording metrics for task {} with TopicPartition {}...", this.taskId, constructCurrentTopicPartition.toString());
        String str = replicatorMetricGroup.sensorPrefix + ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_THROUGHPUT;
        log.trace("Recording throughput metric for task {} with TopicPartition {}", this.taskId, constructCurrentTopicPartition);
        replicatorMetricGroup.recordMetrics(str, 1.0d);
        if (recordMetadata != null) {
            String str2 = replicatorMetricGroup.sensorPrefix + ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_BYTE_THROUGHPUT;
            double calculateByteBasedThroughput = calculateByteBasedThroughput(recordMetadata);
            log.trace("Recording byte based throughput of {} for task {} with TopicPartition {}", new Object[]{Double.valueOf(calculateByteBasedThroughput), this.taskId, constructCurrentTopicPartition});
            replicatorMetricGroup.recordMetrics(str2, calculateByteBasedThroughput);
        }
        if (sourceRecord.sourceOffset() != null) {
            String str3 = replicatorMetricGroup.sensorPrefix + ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_MESSAGE_LAG;
            long calculateLag = calculateLag(sourceRecord, constructCurrentTopicPartition);
            log.trace("Recording lag of {} messages for task {} with TopicPartition {}", new Object[]{Long.valueOf(calculateLag), this.taskId, constructCurrentTopicPartition});
            replicatorMetricGroup.recordMetrics(str3, calculateLag);
        }
        if (sourceRecord.timestamp() != null) {
            String str4 = replicatorMetricGroup.sensorPrefix + ConfluentReplicatorMetricsRegistry.REPLICATOR_TASK_LATENCY;
            long calculateLatency = calculateLatency(sourceRecord);
            log.trace("Recording latency of {} ms for task {} with TopicPartition {}", new Object[]{Long.valueOf(calculateLatency), this.taskId, constructCurrentTopicPartition});
            replicatorMetricGroup.recordMetrics(str4, calculateLatency);
        }
        log.trace("Successfully recorded metrics for task {} with TopicPartition {}", this.taskId, constructCurrentTopicPartition.toString());
    }

    public Map<TopicPartition, ConfluentReplicatorMetrics.ReplicatorMetricGroup> getTopicPartitionReplicatorGroupMap() {
        return this.topicPartitionReplicatorGroupMap;
    }

    public void stopMetrics() {
        log.debug("Closing each MetricGroup for this task...");
        closeMetricGroups();
        if (this.replicatorMetrics != null) {
            log.debug("Shutting down replicatorMetrics for task {}", this.taskId);
            this.replicatorMetrics.stop();
        }
        if (this.endOffsetService != null) {
            log.debug("Shutting down endOffsetService for task {}", this.taskId);
            this.endOffsetService.shutdown();
        }
    }

    private void closeMetricGroups() {
        for (ConfluentReplicatorMetrics.ReplicatorMetricGroup replicatorMetricGroup : this.topicPartitionReplicatorGroupMap.values()) {
            log.debug("Closing MetricGroup {}", replicatorMetricGroup.groupId.toString());
            replicatorMetricGroup.close();
        }
    }
}
