package cn.leancloud.kafka.consumer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.class */
public final class PartialAsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K, V> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) PartialAsyncCommitPolicy.class);
    private final int maxPendingAsyncCommits;
    private final OffsetCommitCallback callback;
    private final Map<TopicPartition, OffsetAndMetadata> pendingAsyncCommitOffset;
    private int pendingAsyncCommitCounter;
    private boolean forceSync;

    /* loaded from: input_file:cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy$AsyncCommitCallback.class */
    private class AsyncCommitCallback implements OffsetCommitCallback {
        static final /* synthetic */ boolean $assertionsDisabled;

        private AsyncCommitCallback() {
        }

        @Override // org.apache.kafka.clients.consumer.OffsetCommitCallback
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            PartialAsyncCommitPolicy.access$106(PartialAsyncCommitPolicy.this);
            if (!$assertionsDisabled && PartialAsyncCommitPolicy.this.pendingAsyncCommitCounter < 0) {
                throw new AssertionError("actual: " + PartialAsyncCommitPolicy.this.pendingAsyncCommitCounter);
            }
            if (exc != null) {
                PartialAsyncCommitPolicy.logger.warn("Failed to commit offset: " + map + " asynchronously", (Throwable) exc);
                PartialAsyncCommitPolicy.this.forceSync = true;
                return;
            }
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
                PartialAsyncCommitPolicy.this.topicOffsetHighWaterMark.remove(entry.getKey(), Long.valueOf(entry.getValue().offset()));
                PartialAsyncCommitPolicy.this.completedTopicOffsets.remove(entry.getKey(), entry.getValue());
            }
        }

        static {
            $assertionsDisabled = !PartialAsyncCommitPolicy.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartialAsyncCommitPolicy(Consumer<K, V> consumer, Duration duration, int i, Duration duration2, int i2) {
        super(consumer, duration, i, duration2);
        this.maxPendingAsyncCommits = i2;
        this.callback = new AsyncCommitCallback();
        this.pendingAsyncCommitOffset = new HashMap();
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public Set<TopicPartition> tryCommit(boolean z) {
        boolean z2 = this.forceSync || this.pendingAsyncCommitCounter >= this.maxPendingAsyncCommits;
        Map<TopicPartition, OffsetAndMetadata> offsetsForPartialCommit = offsetsForPartialCommit(z2);
        if (offsetsForPartialCommit.isEmpty()) {
            return Collections.emptySet();
        }
        Set<TopicPartition> completedPartitions = getCompletedPartitions(z);
        if (z2) {
            commitSync(offsetsForPartialCommit);
            this.pendingAsyncCommitOffset.clear();
            this.pendingAsyncCommitCounter = 0;
            this.forceSync = false;
            clearCachedCompletedPartitionsRecords(completedPartitions, z);
        } else {
            this.pendingAsyncCommitCounter++;
            this.consumer.commitAsync(offsetsForPartialCommit, this.callback);
            this.pendingAsyncCommitOffset.putAll(offsetsForPartialCommit);
        }
        return completedPartitions;
    }

    @VisibleForTesting
    int pendingAsyncCommitCount() {
        return this.pendingAsyncCommitCounter;
    }

    @VisibleForTesting
    boolean forceSync() {
        return this.forceSync;
    }

    private Map<TopicPartition, OffsetAndMetadata> offsetsForPartialCommit(boolean z) {
        Map<TopicPartition, OffsetAndMetadata> hashMap;
        if (needRecommit()) {
            hashMap = offsetsForRecommit();
            updateNextRecommitTime();
        } else if (z) {
            hashMap = this.completedTopicOffsets;
        } else {
            hashMap = new HashMap(this.completedTopicOffsets);
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.pendingAsyncCommitOffset.entrySet()) {
                hashMap.remove(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    static /* synthetic */ int access$106(PartialAsyncCommitPolicy partialAsyncCommitPolicy) {
        int i = partialAsyncCommitPolicy.pendingAsyncCommitCounter - 1;
        partialAsyncCommitPolicy.pendingAsyncCommitCounter = i;
        return i;
    }
}
