package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.class */
public class KafkaTopicConsumerManager implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaTopicConsumerManager.class);
    private static final AtomicIntegerFieldUpdater<KafkaTopicConsumerManager> NUM_CREATED_CURSORS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(KafkaTopicConsumerManager.class, "numCreatedCursors");
    private final PersistentTopic topic;
    private final KafkaRequestHandler requestHandler;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile int numCreatedCursors = 0;
    private final Map<Long, CompletableFuture<Pair<ManagedCursor, Long>>> cursors = new ConcurrentHashMap();
    private final Map<String, ManagedCursor> createdCursors = new ConcurrentHashMap();
    private final Map<Long, Long> lastAccessTimes = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaTopicConsumerManager(KafkaRequestHandler kafkaRequestHandler, PersistentTopic persistentTopic) {
        this.topic = persistentTopic;
        this.requestHandler = kafkaRequestHandler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteExpiredCursor(long j, long j2) {
        this.lastAccessTimes.forEach((l, l2) -> {
            if ((j - l2.longValue()) - j2 > 0) {
                deleteOneExpiredCursor(l.longValue());
            }
        });
    }

    void deleteOneExpiredCursor(long j) {
        if (this.closed.get()) {
            return;
        }
        CompletableFuture<Pair<ManagedCursor, Long>> remove = this.cursors.remove(Long.valueOf(j));
        this.lastAccessTimes.remove(Long.valueOf(j));
        if (remove != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}", this.requestHandler.ctx.channel(), Long.valueOf(j), Integer.valueOf(this.cursors.size()));
            }
            remove.whenComplete((pair, th) -> {
                if (th != null || pair == null) {
                    return;
                }
                deleteOneCursorAsync((ManagedCursor) pair.getKey(), "cursor expired");
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteOneCursorAsync(final ManagedCursor managedCursor, final String str) {
        if (this.closed.get() || managedCursor == null) {
            return;
        }
        this.topic.getManagedLedger().asyncDeleteCursor(managedCursor.getName(), new AsyncCallbacks.DeleteCursorCallback() { // from class: io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.1
            public void deleteCursorComplete(Object obj) {
                if (KafkaTopicConsumerManager.log.isDebugEnabled()) {
                    KafkaTopicConsumerManager.log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.", KafkaTopicConsumerManager.this.requestHandler.ctx.channel(), managedCursor.getName(), KafkaTopicConsumerManager.this.topic.getName(), str);
                }
            }

            public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                KafkaTopicConsumerManager.log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.", KafkaTopicConsumerManager.this.requestHandler.ctx.channel(), managedCursor.getName(), KafkaTopicConsumerManager.this.topic.getName(), str, managedLedgerException);
            }
        }, (Object) null);
        this.createdCursors.remove(managedCursor.getName());
    }

    public CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture(long j) {
        if (this.closed.get()) {
            return null;
        }
        this.lastAccessTimes.remove(Long.valueOf(j));
        CompletableFuture<Pair<ManagedCursor, Long>> remove = this.cursors.remove(Long.valueOf(j));
        if (remove == null) {
            return asyncCreateCursorIfNotExists(j);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}", this.requestHandler.ctx.channel(), Long.valueOf(j), Integer.valueOf(this.cursors.size()));
        }
        return remove;
    }

    private CompletableFuture<Pair<ManagedCursor, Long>> asyncCreateCursorIfNotExists(long j) {
        if (this.closed.get()) {
            return null;
        }
        this.cursors.putIfAbsent(Long.valueOf(j), asyncGetCursorByOffset(j));
        this.lastAccessTimes.remove(Long.valueOf(j));
        return this.cursors.remove(Long.valueOf(j));
    }

    public void add(long j, Pair<ManagedCursor, Long> pair) {
        Preconditions.checkArgument(j == pair.getRight().longValue(), "offset not equal. key: " + j + " value: " + pair.getRight());
        if (this.closed.get()) {
            deleteOneCursorAsync(pair.getLeft(), "A race - add cursor back but tcm already closed");
            return;
        }
        if (this.cursors.putIfAbsent(Long.valueOf(j), CompletableFuture.completedFuture(pair)) != null) {
            deleteOneCursorAsync(pair.getLeft(), "reason: A race - same cursor already cached");
        }
        this.lastAccessTimes.put(Long.valueOf(j), Long.valueOf(System.currentTimeMillis()));
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add cursor back {} for offset: {}", this.requestHandler.ctx.channel(), pair.getLeft().getName(), Long.valueOf(j));
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Close TCM for topic {}.", this.requestHandler.ctx.channel(), this.topic.getName());
            }
            NUM_CREATED_CURSORS_UPDATER.set(this, 0);
            ArrayList arrayList = new ArrayList();
            this.cursors.forEach((l, completableFuture) -> {
                arrayList.add(completableFuture);
            });
            this.cursors.clear();
            this.lastAccessTimes.clear();
            ArrayList arrayList2 = new ArrayList();
            this.createdCursors.forEach((str, managedCursor) -> {
                arrayList2.add(managedCursor);
            });
            this.createdCursors.clear();
            arrayList.forEach(completableFuture2 -> {
                completableFuture2.whenComplete((pair, th) -> {
                    if (th != null || pair == null) {
                        return;
                    }
                    deleteOneCursorAsync((ManagedCursor) pair.getLeft(), "TopicConsumerManager close");
                });
            });
            arrayList.clear();
            arrayList2.forEach(managedCursor2 -> {
                deleteOneCursorAsync(managedCursor2, "TopicConsumerManager close but cursor is still outstanding");
            });
            arrayList2.clear();
        }
    }

    private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long j) {
        if (this.closed.get()) {
            return CompletableFuture.completedFuture(null);
        }
        ManagedLedger managedLedger = this.topic.getManagedLedger();
        return managedLedger.asyncFindPosition(new OffsetSearchPredicate(j)).thenApply(position -> {
            String str = "kop-consumer-cursor-" + this.topic.getName() + "-" + position.getLedgerId() + "-" + position.getEntryId() + "-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
            PositionImpl previousPosition = ((ManagedLedgerImpl) managedLedger).getPreviousPosition((PositionImpl) position);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}", this.requestHandler.ctx.channel(), str, Long.valueOf(j), position, previousPosition);
            }
            try {
                ManagedCursor newNonDurableCursor = managedLedger.newNonDurableCursor(previousPosition, str);
                NUM_CREATED_CURSORS_UPDATER.incrementAndGet(this);
                this.createdCursors.putIfAbsent(newNonDurableCursor.getName(), newNonDurableCursor);
                this.lastAccessTimes.put(Long.valueOf(j), Long.valueOf(System.currentTimeMillis()));
                return Pair.of(newNonDurableCursor, Long.valueOf(j));
            } catch (ManagedLedgerException e) {
                log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.", this.requestHandler.ctx.channel(), this.topic.getName(), Long.valueOf(j), previousPosition, e);
                return null;
            }
        });
    }

    public ManagedLedger getManagedLedger() {
        return this.topic.getManagedLedger();
    }

    @VisibleForTesting
    public int getNumCreatedCursors() {
        return this.numCreatedCursors;
    }

    public Map<Long, CompletableFuture<Pair<ManagedCursor, Long>>> getCursors() {
        return this.cursors;
    }

    public Map<String, ManagedCursor> getCreatedCursors() {
        return this.createdCursors;
    }

    public Map<Long, Long> getLastAccessTimes() {
        return this.lastAccessTimes;
    }
}
