package io.confluent.connect.elasticsearch;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/connect/elasticsearch/OffsetTracker.class */
public class OffsetTracker {
    private static final Logger log = LoggerFactory.getLogger(OffsetTracker.class);
    private final Map<TopicPartition, Map<Long, OffsetState>> offsetsByPartition = new HashMap();
    private final Map<TopicPartition, Long> maxOffsetByPartition = new HashMap();
    private final AtomicLong numEntries = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/connect/elasticsearch/OffsetTracker$OffsetState.class */
    public static class OffsetState {
        private final long offset;
        private volatile boolean processed;

        OffsetState(long j) {
            this.offset = j;
        }

        public void markProcessed() {
            this.processed = true;
        }

        public boolean isProcessed() {
            return this.processed;
        }
    }

    public synchronized void closePartitions(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> {
            if (this.offsetsByPartition.remove(topicPartition) != null) {
                this.numEntries.getAndAdd(-r0.size());
            }
            this.maxOffsetByPartition.remove(topicPartition);
        });
    }

    public synchronized OffsetState addPendingRecord(SinkRecord sinkRecord) {
        log.trace("Adding pending record");
        TopicPartition topicPartition = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
        Long l = this.maxOffsetByPartition.get(topicPartition);
        if (l != null && sinkRecord.kafkaOffset() <= l.longValue()) {
            return new OffsetState(sinkRecord.kafkaOffset());
        }
        this.numEntries.incrementAndGet();
        return this.offsetsByPartition.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new LinkedHashMap();
        }).computeIfAbsent(Long.valueOf(sinkRecord.kafkaOffset()), (v1) -> {
            return new OffsetState(v1);
        });
    }

    public long numOffsetStateEntries() {
        return this.numEntries.get();
    }

    public synchronized void updateOffsets() {
        log.trace("Updating offsets");
        this.offsetsByPartition.forEach((topicPartition, map) -> {
            Long l = this.maxOffsetByPartition.get(topicPartition);
            boolean z = false;
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                OffsetState offsetState = (OffsetState) it.next();
                if (!offsetState.isProcessed()) {
                    break;
                }
                it.remove();
                this.numEntries.decrementAndGet();
                if (l == null || offsetState.offset > l.longValue()) {
                    l = Long.valueOf(offsetState.offset);
                    z = true;
                }
            }
            if (z) {
                this.maxOffsetByPartition.put(topicPartition, l);
            }
        });
        log.trace("Updated offsets, num entries: {}", this.numEntries);
    }

    public synchronized Map<TopicPartition, OffsetAndMetadata> offsets() {
        return (Map) this.maxOffsetByPartition.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndMetadata(((Long) entry.getValue()).longValue() + 1);
        }));
    }
}
