package cn.leancloud.kafka.consumer;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cn/leancloud/kafka/consumer/AbstractCommitPolicy.class */
public abstract class AbstractCommitPolicy<K, V> implements CommitPolicy<K, V> {
    static SleepFunction sleepFunction;
    final Map<TopicPartition, Long> topicOffsetHighWaterMark = new HashMap();
    final Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets = new HashMap();
    protected final Consumer<K, V> consumer;
    private final long syncCommitRetryIntervalMs;
    private final int maxAttemptsForEachSyncCommit;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cn/leancloud/kafka/consumer/AbstractCommitPolicy$RetryContext.class */
    public static class RetryContext {
        private final long retryInterval;
        private final int maxAttempts;
        private int numOfAttempts;

        private RetryContext(long j, int i) {
            this.retryInterval = j;
            this.maxAttempts = i;
            this.numOfAttempts = 0;
        }

        void onError(RetriableException retriableException) {
            int i = this.numOfAttempts + 1;
            this.numOfAttempts = i;
            if (i >= this.maxAttempts) {
                throw retriableException;
            }
            try {
                AbstractCommitPolicy.sleepFunction.sleep(this.retryInterval);
            } catch (InterruptedException e) {
                retriableException.addSuppressed(e);
                Thread.currentThread().interrupt();
                throw retriableException;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cn/leancloud/kafka/consumer/AbstractCommitPolicy$SleepFunction.class */
    public interface SleepFunction {
        void sleep(long j) throws InterruptedException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractCommitPolicy(Consumer<K, V> consumer, Duration duration, int i) {
        this.consumer = consumer;
        this.syncCommitRetryIntervalMs = duration.toMillis();
        this.maxAttemptsForEachSyncCommit = i;
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public void markPendingRecord(ConsumerRecord<K, V> consumerRecord) {
        this.topicOffsetHighWaterMark.merge(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1), (v0, v1) -> {
            return Math.max(v0, v1);
        });
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public void markCompletedRecord(ConsumerRecord<K, V> consumerRecord) {
        this.completedTopicOffsets.merge(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1), BinaryOperator.maxBy(Comparator.comparing((v0) -> {
            return v0.offset();
        })));
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public Set<TopicPartition> syncPartialCommit() {
        commitSync(this.completedTopicOffsets);
        Set<TopicPartition> checkCompletedPartitions = checkCompletedPartitions();
        this.completedTopicOffsets.clear();
        Iterator<TopicPartition> it = checkCompletedPartitions.iterator();
        while (it.hasNext()) {
            this.topicOffsetHighWaterMark.remove(it.next());
        }
        return checkCompletedPartitions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> getCompletedPartitions(boolean z) {
        Set<TopicPartition> checkCompletedPartitions;
        if (!z) {
            checkCompletedPartitions = checkCompletedPartitions();
        } else {
            if (!$assertionsDisabled && !checkCompletedPartitions().equals(this.topicOffsetHighWaterMark.keySet())) {
                throw new AssertionError("expect: " + checkCompletedPartitions() + " actual: " + this.topicOffsetHighWaterMark.keySet());
            }
            checkCompletedPartitions = new HashSet(this.topicOffsetHighWaterMark.keySet());
        }
        return checkCompletedPartitions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearCachedCompletedPartitionsRecords(Set<TopicPartition> set, boolean z) {
        this.completedTopicOffsets.clear();
        if (z) {
            this.topicOffsetHighWaterMark.clear();
            return;
        }
        Iterator<TopicPartition> it = set.iterator();
        while (it.hasNext()) {
            this.topicOffsetHighWaterMark.remove(it.next());
        }
    }

    @VisibleForTesting
    Map<TopicPartition, Long> topicOffsetHighWaterMark() {
        return this.topicOffsetHighWaterMark;
    }

    @VisibleForTesting
    Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
        return this.completedTopicOffsets;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitSync() {
        RetryContext context = context();
        while (true) {
            try {
                this.consumer.commitSync();
                return;
            } catch (RetriableException e) {
                context.onError(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        RetryContext context = context();
        while (true) {
            try {
                this.consumer.commitSync(map);
                return;
            } catch (RetriableException e) {
                context.onError(e);
            }
        }
    }

    private Set<TopicPartition> checkCompletedPartitions() {
        return (Set) this.completedTopicOffsets.entrySet().stream().filter(entry -> {
            return topicOffsetMeetHighWaterMark((TopicPartition) entry.getKey(), (OffsetAndMetadata) entry.getValue());
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
    }

    private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        Long l = this.topicOffsetHighWaterMark.get(topicPartition);
        return l == null || offsetAndMetadata.offset() >= l.longValue();
    }

    private RetryContext context() {
        return new RetryContext(this.syncCommitRetryIntervalMs, this.maxAttemptsForEachSyncCommit);
    }

    static {
        $assertionsDisabled = !AbstractCommitPolicy.class.desiredAssertionStatus();
        sleepFunction = Thread::sleep;
    }
}
