package co.cask.cdap.metrics.process;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metrics/process/PersistedMessageCallback.class */
public final class PersistedMessageCallback implements KafkaConsumer.MessageCallback {
    private static final Logger LOG = LoggerFactory.getLogger(PersistedMessageCallback.class);
    private final KafkaConsumer.MessageCallback delegate;
    private final MetricsConsumerMetaTable metaTable;
    private final int persistThreshold;
    private final Map<TopicPartitionMetaKey, Long> offsets = Maps.newConcurrentMap();
    private final AtomicInteger messageCount = new AtomicInteger();

    /* loaded from: input_file:co/cask/cdap/metrics/process/PersistedMessageCallback$OffsetTrackingIterator.class */
    private final class OffsetTrackingIterator implements Iterator<FetchedMessage> {
        private final Iterator<FetchedMessage> delegate;
        private TopicPartitionMetaKey lastTopicPartition;
        private long lastOffset = -1;

        OffsetTrackingIterator(Iterator<FetchedMessage> it) {
            this.delegate = it;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.delegate.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public FetchedMessage next() {
            FetchedMessage next = this.delegate.next();
            this.lastTopicPartition = new TopicPartitionMetaKey(next.getTopicPartition());
            this.lastOffset = next.getNextOffset();
            PersistedMessageCallback.this.messageCount.incrementAndGet();
            recordOffset();
            return next;
        }

        @Override // java.util.Iterator
        public void remove() {
            this.delegate.remove();
        }

        private void recordOffset() {
            if (this.lastOffset >= 0) {
                PersistedMessageCallback.this.offsets.put(this.lastTopicPartition, Long.valueOf(this.lastOffset));
            }
        }
    }

    public PersistedMessageCallback(KafkaConsumer.MessageCallback messageCallback, MetricsConsumerMetaTable metricsConsumerMetaTable, int i) {
        this.delegate = messageCallback;
        this.metaTable = metricsConsumerMetaTable;
        this.persistThreshold = i;
    }

    public long onReceived(Iterator<FetchedMessage> it) {
        long onReceived = this.delegate.onReceived(new OffsetTrackingIterator(it));
        if (this.messageCount.get() >= this.persistThreshold) {
            this.messageCount.set(0);
            persistOffsets();
        }
        return onReceived;
    }

    public void finished() {
        try {
            this.delegate.finished();
            persistOffsets();
        } catch (Throwable th) {
            persistOffsets();
            throw th;
        }
    }

    private void persistOffsets() {
        try {
            this.metaTable.save(ImmutableMap.copyOf(this.offsets));
        } catch (Exception e) {
            LOG.error("Failed to persist consumed message offset. {}", e.getMessage(), e);
        }
    }
}
