package io.streamnative.pulsar.handlers.kop;

import com.google.common.base.Preconditions;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.Closeable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.class */
public class KafkaTopicConsumerManager implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicConsumerManager.class);
    private final PersistentTopic topic;
    private final KafkaRequestHandler requestHandler;
    private final ConcurrentLongHashMap<Pair<ManagedCursor, Long>> consumers = new ConcurrentLongHashMap<>();
    private final ConcurrentMap<String, ManagedCursor> createdCursors = new ConcurrentHashMap();
    private final ConcurrentLongHashMap<Long> lastAccessTimes = new ConcurrentLongHashMap<>();
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private boolean closed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaTopicConsumerManager(KafkaRequestHandler kafkaRequestHandler, PersistentTopic persistentTopic) {
        this.topic = persistentTopic;
        this.requestHandler = kafkaRequestHandler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteExpiredCursor(long j, long j2) {
        this.lastAccessTimes.forEach((j3, l) -> {
            if ((j - l.longValue()) - j2 > 0) {
                deleteOneExpiredCursor(j3);
            }
        });
    }

    void deleteOneExpiredCursor(long j) {
        this.rwLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            Pair pair = (Pair) this.consumers.remove(j);
            this.lastAccessTimes.remove(j);
            this.rwLock.readLock().unlock();
            if (pair != null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Cursor timed out for offset: {} - {}, cursors cache size: {}", new Object[]{this.requestHandler.ctx.channel(), Long.valueOf(j), MessageIdUtils.getPosition(j), Long.valueOf(this.consumers.size())});
                }
                deleteOneCursorAsync((ManagedCursor) pair.getKey(), "cursor expired");
            }
        } finally {
            this.rwLock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteOneCursorAsync(final ManagedCursor managedCursor, final String str) {
        if (managedCursor != null) {
            this.topic.getManagedLedger().asyncDeleteCursor(managedCursor.getName(), new AsyncCallbacks.DeleteCursorCallback() { // from class: io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.1
                public void deleteCursorComplete(Object obj) {
                    if (KafkaTopicConsumerManager.log.isDebugEnabled()) {
                        KafkaTopicConsumerManager.log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.", new Object[]{KafkaTopicConsumerManager.this.requestHandler.ctx.channel(), managedCursor.getName(), KafkaTopicConsumerManager.this.topic.getName(), str});
                    }
                }

                public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    KafkaTopicConsumerManager.log.warn("[{}] Error deleting cursor for topic {} for reason: {}.", new Object[]{KafkaTopicConsumerManager.this.requestHandler.ctx.channel(), managedCursor.getName(), KafkaTopicConsumerManager.this.topic.getName(), str, managedLedgerException});
                }
            }, (Object) null);
            this.createdCursors.remove(managedCursor.getName());
        }
    }

    public Pair<ManagedCursor, Long> remove(long j) {
        this.rwLock.readLock().lock();
        try {
            if (this.closed) {
                return null;
            }
            Pair<ManagedCursor, Long> pair = (Pair) this.consumers.remove(j);
            this.lastAccessTimes.remove(j);
            this.rwLock.readLock().unlock();
            if (pair == null) {
                return createCursorIfNotExists(j);
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Get cursor for offset: {} - {} in cache. cache size: {}", new Object[]{this.requestHandler.ctx.channel(), Long.valueOf(j), MessageIdUtils.getPosition(j), Long.valueOf(this.consumers.size())});
            }
            return pair;
        } finally {
            this.rwLock.readLock().unlock();
        }
    }

    private Pair<ManagedCursor, Long> createCursorIfNotExists(long j) {
        long offsetAfterBatchIndex = MessageIdUtils.offsetAfterBatchIndex(j);
        this.rwLock.readLock().lock();
        try {
            if (this.closed) {
                return null;
            }
            this.consumers.computeIfAbsent(offsetAfterBatchIndex, j2 -> {
                PositionImpl position = MessageIdUtils.getPosition(j2);
                String str = "kop-consumer-cursor-" + this.topic.getName() + "-" + position.getLedgerId() + "-" + position.getEntryId() + "-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
                ManagedLedgerImpl managedLedger = this.topic.getManagedLedger();
                PositionImpl previousPosition = managedLedger.getPreviousPosition(position);
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}", new Object[]{this.requestHandler.ctx.channel(), str, Long.valueOf(j2), position, previousPosition});
                }
                try {
                    ManagedCursor newNonDurableCursor = managedLedger.newNonDurableCursor(previousPosition, str);
                    this.createdCursors.put(newNonDurableCursor.getName(), newNonDurableCursor);
                    this.lastAccessTimes.put(j2, Long.valueOf(System.currentTimeMillis()));
                    return Pair.of(newNonDurableCursor, Long.valueOf(j2));
                } catch (ManagedLedgerException e) {
                    log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.", new Object[]{this.requestHandler.ctx.channel(), this.topic.getName(), Long.valueOf(j2), previousPosition, e});
                    return null;
                }
            });
            Pair<ManagedCursor, Long> pair = (Pair) this.consumers.remove(offsetAfterBatchIndex);
            this.lastAccessTimes.remove(offsetAfterBatchIndex);
            this.rwLock.readLock().unlock();
            return pair;
        } finally {
            this.rwLock.readLock().unlock();
        }
    }

    public void add(long j, Pair<ManagedCursor, Long> pair) {
        Preconditions.checkArgument(j == ((Long) pair.getRight()).longValue(), "offset not equal. key: " + j + " value: " + pair.getRight());
        this.rwLock.readLock().lock();
        try {
            if (this.closed) {
                deleteOneCursorAsync((ManagedCursor) pair.getLeft(), "A race - add cursor back but tcm already closed");
                this.rwLock.readLock().unlock();
                return;
            }
            this.rwLock.readLock().unlock();
            if (((Pair) this.consumers.putIfAbsent(j, pair)) != null) {
                deleteOneCursorAsync((ManagedCursor) pair.getLeft(), "reason: A race - same cursor already cached");
            }
            this.lastAccessTimes.put(j, Long.valueOf(System.currentTimeMillis()));
            if (log.isDebugEnabled()) {
                log.debug("[{}] Add cursor back {} for offset: {} - {}", new Object[]{this.requestHandler.ctx.channel(), ((ManagedCursor) pair.getLeft()).getName(), Long.valueOf(j), MessageIdUtils.getPosition(j)});
            }
        } catch (Throwable th) {
            this.rwLock.readLock().unlock();
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.rwLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Close TCM for topic {}.", this.requestHandler.ctx.channel(), this.topic.getName());
            }
            ConcurrentLongHashMap concurrentLongHashMap = new ConcurrentLongHashMap();
            this.consumers.forEach((j, pair) -> {
            });
            this.consumers.clear();
            this.lastAccessTimes.clear();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            this.createdCursors.forEach((str, managedCursor) -> {
            });
            this.createdCursors.clear();
            concurrentLongHashMap.values().forEach(pair2 -> {
                ManagedCursor managedCursor2 = (ManagedCursor) pair2.getLeft();
                deleteOneCursorAsync(managedCursor2, "TopicConsumerManager close");
                if (null != managedCursor2) {
                    concurrentHashMap.remove(managedCursor2.getName());
                }
            });
            concurrentHashMap.values().forEach(managedCursor2 -> {
                deleteOneCursorAsync(managedCursor2, "TopicConsumerManager close but cursor is still outstanding");
            });
            concurrentHashMap.clear();
        } finally {
            this.rwLock.writeLock().unlock();
        }
    }

    public ConcurrentLongHashMap<Pair<ManagedCursor, Long>> getConsumers() {
        return this.consumers;
    }

    public ConcurrentLongHashMap<Long> getLastAccessTimes() {
        return this.lastAccessTimes;
    }
}
