package io.streamnative.pulsar.handlers.kop;

import io.netty.util.Recycler;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.security.auth.Resource;
import io.streamnative.pulsar.handlers.kop.security.auth.ResourceType;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseCallbackWrapper;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/MessageFetchContext.class */
public final class MessageFetchContext {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageFetchContext.class);
    private static final Recycler<MessageFetchContext> RECYCLER = new Recycler<MessageFetchContext>() { // from class: io.streamnative.pulsar.handlers.kop.MessageFetchContext.1
        protected MessageFetchContext newObject(Recycler.Handle<MessageFetchContext> handle) {
            return new MessageFetchContext(handle);
        }

        /* renamed from: newObject, reason: collision with other method in class */
        protected /* bridge */ /* synthetic */ Object m301newObject(Recycler.Handle handle) {
            return newObject((Recycler.Handle<MessageFetchContext>) handle);
        }
    };
    private final Recycler.Handle<MessageFetchContext> recyclerHandle;
    private Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData;
    private ConcurrentLinkedQueue<DecodeResult> decodeResults;
    private KafkaRequestHandler requestHandler;
    private int maxReadEntriesNum;
    private KafkaTopicManager topicManager;
    private RequestStats statsLogger;
    private TransactionCoordinator tc;
    private String clientHost;
    private FetchRequest fetchRequest;
    private RequestHeader header;
    private volatile CompletableFuture<AbstractResponse> resultFuture;
    private AtomicBoolean hasComplete;
    private AtomicLong bytesReadable;
    private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;

    public static MessageFetchContext get(KafkaRequestHandler kafkaRequestHandler, KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture, DelayedOperationPurgatory<DelayedOperation> delayedOperationPurgatory) {
        MessageFetchContext messageFetchContext = (MessageFetchContext) RECYCLER.get();
        messageFetchContext.responseData = new ConcurrentHashMap();
        messageFetchContext.decodeResults = new ConcurrentLinkedQueue<>();
        messageFetchContext.requestHandler = kafkaRequestHandler;
        messageFetchContext.maxReadEntriesNum = kafkaRequestHandler.getMaxReadEntriesNum();
        messageFetchContext.topicManager = kafkaRequestHandler.getTopicManager();
        messageFetchContext.statsLogger = kafkaRequestHandler.requestStats;
        messageFetchContext.tc = kafkaRequestHandler.getTransactionCoordinator();
        messageFetchContext.clientHost = kafkaHeaderAndRequest.getClientHost();
        messageFetchContext.fetchRequest = (FetchRequest) kafkaHeaderAndRequest.getRequest();
        messageFetchContext.header = kafkaHeaderAndRequest.getHeader();
        messageFetchContext.resultFuture = completableFuture;
        messageFetchContext.hasComplete = new AtomicBoolean(false);
        messageFetchContext.bytesReadable = new AtomicLong(0L);
        messageFetchContext.fetchPurgatory = delayedOperationPurgatory;
        return messageFetchContext;
    }

    public static MessageFetchContext getForTest(FetchRequest fetchRequest, CompletableFuture<AbstractResponse> completableFuture) {
        MessageFetchContext messageFetchContext = (MessageFetchContext) RECYCLER.get();
        messageFetchContext.responseData = new ConcurrentHashMap();
        messageFetchContext.decodeResults = new ConcurrentLinkedQueue<>();
        messageFetchContext.requestHandler = null;
        messageFetchContext.maxReadEntriesNum = 0;
        messageFetchContext.topicManager = null;
        messageFetchContext.statsLogger = null;
        messageFetchContext.tc = null;
        messageFetchContext.clientHost = null;
        messageFetchContext.fetchRequest = fetchRequest;
        messageFetchContext.header = null;
        messageFetchContext.resultFuture = completableFuture;
        messageFetchContext.hasComplete = new AtomicBoolean(false);
        return messageFetchContext;
    }

    private MessageFetchContext(Recycler.Handle<MessageFetchContext> handle) {
        this.recyclerHandle = handle;
    }

    private void recycle() {
        this.responseData = null;
        this.decodeResults = null;
        this.requestHandler = null;
        this.maxReadEntriesNum = 0;
        this.topicManager = null;
        this.statsLogger = null;
        this.tc = null;
        this.clientHost = null;
        this.fetchRequest = null;
        this.header = null;
        this.resultFuture = null;
        this.hasComplete = null;
        this.bytesReadable = null;
        this.fetchPurgatory = null;
        this.recyclerHandle.recycle(this);
    }

    public void addErrorPartitionResponseForTest(TopicPartition topicPartition, Errors errors) {
        this.responseData.put(topicPartition, new FetchResponse.PartitionData<>(errors, -1L, -1L, -1L, null, MemoryRecords.EMPTY));
        tryComplete();
    }

    private void addErrorPartitionResponse(TopicPartition topicPartition, Errors errors) {
        this.responseData.put(topicPartition, new FetchResponse.PartitionData<>(errors, -1L, -1L, -1L, null, MemoryRecords.EMPTY));
        tryComplete();
    }

    private void tryComplete() {
        if (this.resultFuture == null || this.responseData.size() < this.fetchRequest.fetchData().size() || !this.hasComplete.compareAndSet(false, true)) {
            return;
        }
        this.fetchPurgatory.tryCompleteElseWatch(new DelayedFetch(this.fetchRequest.maxWait(), this.bytesReadable, this.fetchRequest.minBytes(), this::complete), (List) this.fetchRequest.fetchData().keySet().stream().map(DelayedOperationKey.TopicPartitionOperationKey::new).collect(Collectors.toList()));
    }

    public void complete() {
        if (this.resultFuture == null) {
            return;
        }
        if (this.resultFuture.isCancelled()) {
            this.decodeResults.forEach((v0) -> {
                v0.release();
            });
            return;
        }
        if (this.resultFuture.isDone()) {
            return;
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.fetchRequest.fetchData().keySet().forEach(topicPartition -> {
            FetchResponse.PartitionData<MemoryRecords> remove = this.responseData.remove(topicPartition);
            if (remove != null) {
                linkedHashMap.put(topicPartition, remove);
            } else {
                linkedHashMap.put(topicPartition, new FetchResponse.PartitionData(Errors.REQUEST_TIMED_OUT, -1L, -1L, -1L, null, MemoryRecords.EMPTY));
            }
        });
        ConcurrentLinkedQueue<DecodeResult> concurrentLinkedQueue = this.decodeResults;
        this.resultFuture.complete(new ResponseCallbackWrapper(new FetchResponse(Errors.NONE, linkedHashMap, ((Integer) CommonFields.THROTTLE_TIME_MS.defaultValue).intValue(), this.fetchRequest.metadata().sessionId()), () -> {
            concurrentLinkedQueue.forEach((v0) -> {
                v0.release();
            });
        }));
        recycle();
    }

    public void handleFetch() {
        boolean z = this.tc != null && this.fetchRequest.isolationLevel().equals(IsolationLevel.READ_COMMITTED);
        this.fetchRequest.fetchData().forEach((topicPartition, partitionData) -> {
            long nowInNano = MathUtils.nowInNano();
            String kopTopic = KopTopic.toString(topicPartition);
            this.requestHandler.authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, kopTopic)).whenComplete((bool, th) -> {
                if (th != null) {
                    log.error("Read topic authorize failed, topic - {}. {}", kopTopic, th.getMessage());
                    addErrorPartitionResponse(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
                } else if (bool.booleanValue()) {
                    handlePartitionData(topicPartition, partitionData, kopTopic, nowInNano, z);
                } else {
                    addErrorPartitionResponse(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
                }
            });
        });
    }

    private void handlePartitionData(TopicPartition topicPartition, FetchRequest.PartitionData partitionData, String str, long j, boolean z) {
        long j2 = partitionData.fetchOffset;
        this.topicManager.getTopicConsumerManager(str).thenAccept(kafkaTopicConsumerManager -> {
            if (kafkaTopicConsumerManager == null) {
                this.statsLogger.getPrepareMetadataStats().registerFailedEvent(MathUtils.elapsedNanos(j), TimeUnit.NANOSECONDS);
                KafkaTopicConsumerManagerCache.getInstance().removeAndClose(str);
                addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION);
                return;
            }
            long logEndOffset = MessageIdUtils.getLogEndOffset(kafkaTopicConsumerManager.getManagedLedger());
            if (j2 > logEndOffset) {
                log.error("Received request for offset {} for partition {}, but we only have entries less than {}.", Long.valueOf(j2), topicPartition, Long.valueOf(logEndOffset));
                addErrorPartitionResponse(topicPartition, Errors.OFFSET_OUT_OF_RANGE);
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} .", topicPartition, Long.valueOf(j2));
            }
            CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture = kafkaTopicConsumerManager.removeCursorFuture(j2);
            if (removeCursorFuture != null) {
                removeCursorFuture.thenAccept(pair -> {
                    if (pair == null) {
                        log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. Fetch for topic return error.", Long.valueOf(j2), topicPartition);
                        addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION);
                    } else {
                        ManagedCursor managedCursor = (ManagedCursor) pair.getLeft();
                        AtomicLong atomicLong = new AtomicLong(((Long) pair.getRight()).longValue());
                        this.statsLogger.getPrepareMetadataStats().registerSuccessfulEvent(MathUtils.elapsedNanos(j), TimeUnit.NANOSECONDS);
                        readEntries(managedCursor, topicPartition, atomicLong).whenComplete((list, th) -> {
                            if (th != null) {
                                kafkaTopicConsumerManager.deleteOneCursorAsync((ManagedCursor) pair.getLeft(), "cursor.readEntry fail. deleteCursor");
                                addErrorPartitionResponse(topicPartition, Errors.forException(th));
                            } else if (list == null) {
                                addErrorPartitionResponse(topicPartition, Errors.forException(new ApiException("Cursor is null")));
                            } else {
                                handleEntries(list, topicPartition, partitionData, str, kafkaTopicConsumerManager, managedCursor, atomicLong, z);
                            }
                        });
                    }
                });
                return;
            }
            log.warn("[{}] KafkaTopicConsumerManager is closed, remove TCM of {}", this.requestHandler.ctx, str);
            KafkaTopicConsumerManagerCache.getInstance().removeAndClose(str);
            addErrorPartitionResponse(topicPartition, Errors.NONE);
        });
    }

    private void handleEntries(List<Entry> list, TopicPartition topicPartition, FetchRequest.PartitionData partitionData, String str, KafkaTopicConsumerManager kafkaTopicConsumerManager, ManagedCursor managedCursor, AtomicLong atomicLong, boolean z) {
        long highWatermark = MessageIdUtils.getHighWatermark(managedCursor.getManagedLedger());
        kafkaTopicConsumerManager.add(atomicLong.get(), Pair.of(managedCursor, Long.valueOf(atomicLong.get())));
        long lastStableOffset = z ? this.tc.getLastStableOffset(TopicName.get(str), highWatermark) : highWatermark;
        List<Entry> list2 = list;
        if (z) {
            list2 = getCommittedEntries(list, lastStableOffset);
            if (log.isDebugEnabled()) {
                log.debug("Request {}: read {} entries but only {} entries are committed", this.header, Integer.valueOf(list.size()), Integer.valueOf(list2.size()));
            }
        } else if (log.isDebugEnabled()) {
            log.debug("Request {}: read {} entries", this.header, Integer.valueOf(list.size()));
        }
        if (list2.isEmpty()) {
            addErrorPartitionResponse(topicPartition, Errors.NONE);
            return;
        }
        short apiVersion = this.header.apiVersion();
        byte b = 2;
        if (apiVersion <= 1) {
            b = 0;
        } else if (apiVersion <= 3) {
            b = 1;
        }
        String computeIfAbsent = this.requestHandler.getCurrentConnectedGroup().computeIfAbsent(this.clientHost, str2 -> {
            String data = ZooKeeperUtils.getData(this.requestHandler.getPulsarService().getZkClient(), this.requestHandler.getGroupIdStoredPath(), ZooKeeperUtils.groupIdPathFormat(this.clientHost, this.header.clientId()));
            log.info("get group name from zk for current connection:{} groupId:{}", this.clientHost, data);
            return data;
        });
        long nowInNano = MathUtils.nowInNano();
        DecodeResult decode = this.requestHandler.getEntryFormatter().decode(list, b);
        this.requestHandler.requestStats.getFetchDecodeStats().registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
        this.decodeResults.add(decode);
        MemoryRecords records = decode.getRecords();
        updateConsumerStats(topicPartition, records, list.size(), computeIfAbsent);
        this.responseData.put(topicPartition, new FetchResponse.PartitionData<>(Errors.NONE, highWatermark, lastStableOffset, highWatermark, z ? this.tc.getAbortedIndexList(partitionData.fetchOffset) : null, records));
        this.bytesReadable.getAndAdd(records.sizeInBytes());
        tryComplete();
    }

    private List<Entry> getCommittedEntries(List<Entry> list, long j) {
        ArrayList arrayList = new ArrayList();
        for (Entry entry : list) {
            try {
            } catch (KoPMessageMetadataNotFoundException e) {
                log.error("[{}:{}] Failed to peek base offset from entry.", Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId()));
            }
            if (j < MessageIdUtils.peekBaseOffsetFromEntry(entry)) {
                break;
            }
            arrayList.add(entry);
        }
        return arrayList;
    }

    private CompletableFuture<List<Entry>> readEntries(final ManagedCursor managedCursor, final TopicPartition topicPartition, final AtomicLong atomicLong) {
        final OpStatsLogger messageReadStats = this.statsLogger.getMessageReadStats();
        final long nowInNano = MathUtils.nowInNano();
        final CompletableFuture<List<Entry>> completableFuture = new CompletableFuture<>();
        final long j = atomicLong.get();
        managedCursor.asyncReadEntries(this.maxReadEntriesNum, new AsyncCallbacks.ReadEntriesCallback() { // from class: io.streamnative.pulsar.handlers.kop.MessageFetchContext.2
            public void readEntriesComplete(List<Entry> list, Object obj) {
                if (!list.isEmpty()) {
                    Entry entry = list.get(list.size() - 1);
                    PositionImpl positionImpl = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                    try {
                        long peekOffsetFromEntry = MessageIdUtils.peekOffsetFromEntry(entry);
                        MessageFetchContext.commitOffset(managedCursor, positionImpl);
                        atomicLong.set(peekOffsetFromEntry + 1);
                        if (MessageFetchContext.log.isDebugEnabled()) {
                            MessageFetchContext.log.debug("Topic {} success read entry: ledgerId: {}, entryId: {}, size: {}, ConsumerManager original offset: {}, lastEntryPosition: {}, nextOffset: {}", topicPartition, Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId()), Integer.valueOf(entry.getLength()), Long.valueOf(j), positionImpl, Long.valueOf(atomicLong.get()));
                        }
                    } catch (Exception e) {
                        MessageFetchContext.log.error("[{}] Failed to peekOffsetFromEntry from position {}", topicPartition, positionImpl);
                        messageReadStats.registerFailedEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
                        completableFuture.completeExceptionally(e);
                        return;
                    }
                }
                messageReadStats.registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
                completableFuture.complete(list);
            }

            public void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                MessageFetchContext.log.error("Error read entry for topic: {}", KopTopic.toString(topicPartition));
                messageReadStats.registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, (Object) null, PositionImpl.latest);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void commitOffset(NonDurableCursorImpl nonDurableCursorImpl, final PositionImpl positionImpl) {
        nonDurableCursorImpl.asyncMarkDelete(positionImpl, new AsyncCallbacks.MarkDeleteCallback() { // from class: io.streamnative.pulsar.handlers.kop.MessageFetchContext.3
            public void markDeleteComplete(Object obj) {
                if (MessageFetchContext.log.isDebugEnabled()) {
                    MessageFetchContext.log.debug("Mark delete success for position: {}", positionImpl);
                }
            }

            public void markDeleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
                MessageFetchContext.log.warn("Mark delete success for position: {} with error:", positionImpl, managedLedgerException);
            }
        }, (Object) null);
    }

    private void updateConsumerStats(TopicPartition topicPartition, MemoryRecords memoryRecords, int i, String str) {
        int parseNumMessages = EntryFormatter.parseNumMessages(memoryRecords);
        this.statsLogger.getStatsLogger().scopeLabel("topic", topicPartition.topic()).scopeLabel(KopServerStats.PARTITION_SCOPE, String.valueOf(topicPartition.partition())).scopeLabel(KopServerStats.GROUP_SCOPE, str).getCounter(KopServerStats.BYTES_OUT).add(memoryRecords.sizeInBytes());
        this.statsLogger.getStatsLogger().scopeLabel("topic", topicPartition.topic()).scopeLabel(KopServerStats.PARTITION_SCOPE, String.valueOf(topicPartition.partition())).scopeLabel(KopServerStats.GROUP_SCOPE, str).getCounter(KopServerStats.MESSAGE_OUT).add(parseNumMessages);
        this.statsLogger.getStatsLogger().scopeLabel("topic", topicPartition.topic()).scopeLabel(KopServerStats.PARTITION_SCOPE, String.valueOf(topicPartition.partition())).scopeLabel(KopServerStats.GROUP_SCOPE, str).getCounter(KopServerStats.ENTRIES_OUT).add(i);
    }
}
