package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.AbortedIndexEntry;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
import io.streamnative.pulsar.handlers.kop.security.Session;
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
import io.streamnative.pulsar.handlers.kop.security.auth.Resource;
import io.streamnative.pulsar.handlers.kop.security.auth.ResourceType;
import io.streamnative.pulsar.handlers.kop.security.auth.SimpleAclAuthorizer;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.class */
public class KafkaRequestHandler extends KafkaCommandDecoder {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaRequestHandler.class);
    public static final long DEFAULT_TIMESTAMP = 0;
    private final PulsarService pulsarService;
    private final KafkaTopicManager topicManager;
    private final GroupCoordinator groupCoordinator;
    private final TransactionCoordinator transactionCoordinator;
    private final String clusterName;
    private final ScheduledExecutorService executor;
    private final PulsarAdmin admin;
    private final SaslAuthenticator authenticator;
    private final Authorizer authorizer;
    private final AdminManager adminManager;
    private final MetadataCache<LocalBrokerData> localBrokerDataCache;
    private final Boolean tlsEnabled;
    private final EndPoint advertisedEndPoint;
    private final String advertisedListeners;
    private final int defaultNumPartitions;
    public final int maxReadEntriesNum;
    private final int failedAuthenticationDelayMs;
    private final String offsetsTopicName;
    private final String txnTopicName;
    private final Set<String> allowedNamespaces;
    private final ConcurrentHashMap<String, String> currentConnectedGroup;
    private final String groupIdStoredPath;
    private final EntryFormatter entryFormatter;
    private final Set<String> groupIds;
    private final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap;
    private final DelayedOperationPurgatory<DelayedOperation> producePurgatory;
    private final DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
    private final long maxPendingBytes;
    private final long resumeThresholdPendingBytes;
    private final AtomicLong pendingBytes;
    private volatile boolean autoReadDisabledPublishBufferLimiting;

    public KafkaRequestHandler(PulsarService pulsarService, KafkaServiceConfiguration kafkaServiceConfiguration, GroupCoordinator groupCoordinator, TransactionCoordinator transactionCoordinator, AdminManager adminManager, MetadataCache<LocalBrokerData> metadataCache, Boolean bool, EndPoint endPoint, StatsLogger statsLogger) throws Exception {
        super(statsLogger, kafkaServiceConfiguration);
        this.groupIds = new HashSet();
        this.pendingTopicFuturesMap = new ConcurrentHashMap();
        this.producePurgatory = DelayedOperationPurgatory.builder().purgatoryName("produce").timeoutTimer(SystemTimer.builder().executorName("produce").build()).build();
        this.fetchPurgatory = DelayedOperationPurgatory.builder().purgatoryName("fetch").timeoutTimer(SystemTimer.builder().executorName("fetch").build()).build();
        this.pendingBytes = new AtomicLong(0L);
        this.autoReadDisabledPublishBufferLimiting = false;
        this.pulsarService = pulsarService;
        this.groupCoordinator = groupCoordinator;
        this.transactionCoordinator = transactionCoordinator;
        this.clusterName = kafkaServiceConfiguration.getClusterName();
        this.executor = pulsarService.getExecutor();
        this.admin = pulsarService.getAdminClient();
        boolean z = pulsarService.getBrokerService().isAuthenticationEnabled() && !kafkaServiceConfiguration.getSaslAllowedMechanisms().isEmpty();
        this.authenticator = z ? new SaslAuthenticator(pulsarService, kafkaServiceConfiguration.getSaslAllowedMechanisms(), kafkaServiceConfiguration) : null;
        this.authorizer = (pulsarService.getBrokerService().isAuthorizationEnabled() && z) ? new SimpleAclAuthorizer(pulsarService) : null;
        this.adminManager = adminManager;
        this.localBrokerDataCache = metadataCache;
        this.tlsEnabled = bool;
        this.advertisedEndPoint = endPoint;
        this.advertisedListeners = kafkaServiceConfiguration.getKafkaAdvertisedListeners();
        this.topicManager = new KafkaTopicManager(this);
        this.defaultNumPartitions = kafkaServiceConfiguration.getDefaultNumPartitions();
        this.maxReadEntriesNum = kafkaServiceConfiguration.getMaxReadEntriesNum();
        this.offsetsTopicName = new KopTopic(String.join("/", kafkaServiceConfiguration.getKafkaMetadataTenant(), kafkaServiceConfiguration.getKafkaMetadataNamespace(), Topic.GROUP_METADATA_TOPIC_NAME)).getFullName();
        this.txnTopicName = new KopTopic(String.join("/", kafkaServiceConfiguration.getKafkaMetadataTenant(), kafkaServiceConfiguration.getKafkaMetadataNamespace(), Topic.TRANSACTION_STATE_TOPIC_NAME)).getFullName();
        this.allowedNamespaces = kafkaServiceConfiguration.getKopAllowedNamespaces();
        this.entryFormatter = EntryFormatterFactory.create(kafkaServiceConfiguration.getEntryFormat());
        this.currentConnectedGroup = new ConcurrentHashMap<>();
        this.groupIdStoredPath = kafkaServiceConfiguration.getGroupIdZooKeeperPath();
        this.maxPendingBytes = kafkaServiceConfiguration.getMaxMessagePublishBufferSizeInMB() * 1024 * 1024;
        this.resumeThresholdPendingBytes = this.maxPendingBytes / 2;
        this.failedAuthenticationDelayMs = kafkaServiceConfiguration.getFailedAuthenticationDelayMs();
        RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet();
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        this.topicManager.setRemoteAddress(channelHandlerContext.channel().remoteAddress());
        if (this.authenticator != null) {
            this.authenticator.reset();
        }
        RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet();
        log.info("channel active: {}", channelHandlerContext.channel());
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
        close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    public void close() {
        if (this.isActive.getAndSet(false)) {
            super.close();
            this.topicManager.close();
            String obj = this.ctx.channel().remoteAddress().toString();
            if (this.currentConnectedGroup.containsKey(obj)) {
                log.info("currentConnectedGroup remove {}", obj);
                this.currentConnectedGroup.remove(obj);
            }
            this.producePurgatory.shutdown();
            this.fetchPurgatory.shutdown();
            RequestStats.ACTIVE_CHANNEL_COUNT_INSTANCE.decrementAndGet();
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected boolean hasAuthenticated() {
        return this.authenticator == null || this.authenticator.complete();
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void channelPrepare(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, BiConsumer<Long, Throwable> biConsumer, BiConsumer<String, Long> biConsumer2) throws AuthenticationException {
        if (this.authenticator != null) {
            this.authenticator.authenticate(channelHandlerContext, byteBuf, biConsumer, biConsumer2);
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void maybeDelayCloseOnAuthenticationFailure() {
        if (this.failedAuthenticationDelayMs > 0) {
            this.ctx.executor().schedule(this::handleCloseOnAuthenticationFailure, this.failedAuthenticationDelayMs, TimeUnit.MILLISECONDS);
        } else {
            handleCloseOnAuthenticationFailure();
        }
    }

    private void handleCloseOnAuthenticationFailure() {
        try {
            completeCloseOnAuthenticationFailure();
        } finally {
            close();
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void completeCloseOnAuthenticationFailure() {
        if (!this.isActive.get() || this.authenticator == null) {
            return;
        }
        this.authenticator.sendAuthenticationFailureResponse();
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleApiVersionsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        if (ApiKeys.API_VERSIONS.isVersionSupported(kafkaHeaderAndRequest.getHeader().apiVersion())) {
            completableFuture.complete(overloadDefaultApiVersionsResponse(false));
        } else {
            completableFuture.complete(overloadDefaultApiVersionsResponse(true));
        }
    }

    protected ApiVersionsResponse overloadDefaultApiVersionsResponse(boolean z) {
        ArrayList arrayList = new ArrayList();
        if (z) {
            return new ApiVersionsResponse(0, Errors.UNSUPPORTED_VERSION, arrayList);
        }
        for (ApiKeys apiKeys : ApiKeys.values()) {
            if (apiKeys.minRequiredInterBrokerMagic <= 2) {
                switch (apiKeys) {
                    case LIST_OFFSETS:
                        arrayList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0, apiKeys.latestVersion()));
                        break;
                    default:
                        arrayList.add(new ApiVersionsResponse.ApiVersion(apiKeys));
                        break;
                }
            }
        }
        return new ApiVersionsResponse(0, Errors.NONE, arrayList);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleError(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        String format = String.format("Kafka API (%s) Not supported by kop server.", kafkaHeaderAndRequest.getHeader().apiKey());
        log.error(format);
        completableFuture.complete(kafkaHeaderAndRequest.getRequest().getErrorResponse(new UnsupportedOperationException(format)));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleInactive(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        AbstractResponse errorResponse = kafkaHeaderAndRequest.getRequest().getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));
        log.error("Kafka API {} is send to a closing channel", kafkaHeaderAndRequest.getHeader().apiKey());
        completableFuture.complete(errorResponse);
    }

    private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String str) {
        return this.admin.topics().getPartitionedTopicMetadataAsync(str);
    }

    private boolean isInternalTopic(String str) {
        return str.equals(this.offsetsTopicName) || str.equals(this.txnTopicName);
    }

    private CompletableFuture<Map<String, List<TopicName>>> getAllTopicsAsync() {
        CompletableFuture<Map<String, List<TopicName>>> completableFuture = new CompletableFuture<>();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicInteger atomicInteger = new AtomicInteger(this.allowedNamespaces.size());
        for (String str : this.allowedNamespaces) {
            this.pulsarService.getNamespaceService().getListOfPersistentTopics(NamespaceName.get(str)).whenComplete((list, th) -> {
                if (th != null) {
                    log.error("Failed to getListOfPersistentTopic of {}", str, th);
                    completableFuture.completeExceptionally(th);
                } else {
                    if (completableFuture.isCompletedExceptionally()) {
                        return;
                    }
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        TopicName topicName = TopicName.get((String) it.next());
                        ((List) concurrentHashMap.computeIfAbsent(KopTopic.removeDefaultNamespacePrefix(topicName.getPartitionedTopicName()), str2 -> {
                            return Collections.synchronizedList(new ArrayList());
                        })).add(topicName);
                    }
                    if (atomicInteger.decrementAndGet() == 0) {
                        completableFuture.complete(concurrentHashMap);
                    }
                }
            });
        }
        return completableFuture;
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleTopicMetadataRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        CompletableFuture thenApply;
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof MetadataRequest);
        MetadataRequest metadataRequest = (MetadataRequest) kafkaHeaderAndRequest.getRequest();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Request {}: for topic {} ", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), metadataRequest.topics());
        }
        List synchronizedList = Collections.synchronizedList(Lists.newArrayList());
        List synchronizedList2 = Collections.synchronizedList(Lists.newArrayList());
        List<String> list = metadataRequest.topics();
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        if (list == null || list.isEmpty()) {
            KafkaTopicManager.clearTopicManagerCache();
            thenApply = getAllTopicsAsync().thenApply(map -> {
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                map.forEach((str, list2) -> {
                    list2.forEach(topicName -> {
                        authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, topicName.toString())).whenComplete((bool, th) -> {
                            if (th == null && bool.booleanValue()) {
                                ((List) concurrentHashMap.computeIfAbsent(str, str -> {
                                    return Collections.synchronizedList(new ArrayList());
                                })).add(topicName);
                            } else {
                                synchronizedList.add(new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, str, isInternalTopic(topicName.toString()), Collections.emptyList()));
                            }
                        });
                    });
                });
                return concurrentHashMap;
            });
        } else {
            thenApply = new CompletableFuture();
            ConcurrentMap newConcurrentMap2 = Maps.newConcurrentMap();
            List<String> list2 = metadataRequest.topics();
            int size = list2.size();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            Runnable runnable = () -> {
                if (atomicInteger.incrementAndGet() == size) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Request {}: Completed get {} topic's partitions", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), Integer.valueOf(size));
                    }
                    thenApply.complete(newConcurrentMap2);
                }
            };
            BiConsumer biConsumer = (str, num) -> {
                KopTopic kopTopic = new KopTopic(str);
                newConcurrentMap2.putIfAbsent(str, IntStream.range(0, num.intValue()).mapToObj(i -> {
                    return TopicName.get(kopTopic.getPartitionName(i));
                }).collect(Collectors.toList()));
                runnable.run();
            };
            BiConsumer biConsumer2 = (str2, str3) -> {
                synchronizedList.add(new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, str2, isInternalTopic(str3), Collections.emptyList()));
                runnable.run();
            };
            list2.forEach(str4 -> {
                String fullName = new KopTopic(str4).getFullName();
                authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullName)).whenComplete((bool, th) -> {
                    if (th != null) {
                        log.error("Describe topic authorize failed, topic - {}. {}", fullName, th.getMessage());
                        biConsumer2.accept(str4, fullName);
                    } else if (bool.booleanValue()) {
                        getPartitionedTopicMetadataAsync(fullName).whenComplete((partitionedTopicMetadata, th) -> {
                            if (th == null) {
                                if (partitionedTopicMetadata.partitions <= 0) {
                                    newConcurrentMap.put(TopicName.get(fullName).getPartition(0).toString(), TopicName.get(fullName));
                                    biConsumer.accept(str4, 1);
                                    return;
                                } else {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Topic {} has {} partitions", str4, Integer.valueOf(partitionedTopicMetadata.partitions));
                                    }
                                    biConsumer.accept(str4, Integer.valueOf(partitionedTopicMetadata.partitions));
                                    return;
                                }
                            }
                            if (!(th instanceof PulsarAdminException.NotFoundException)) {
                                synchronizedList.add(new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, str4, isInternalTopic(fullName), Collections.emptyList()));
                                log.warn("[{}] Request {}: Failed to get partitioned pulsar topic {} metadata: {}", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), fullName, th.getMessage());
                                runnable.run();
                            } else if (this.kafkaConfig.isAllowAutoTopicCreation() && metadataRequest.allowAutoTopicCreation()) {
                                log.info("[{}] Request {}: Topic {} doesn't exist, auto create it with {} partitions", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str4, Integer.valueOf(this.defaultNumPartitions));
                                this.admin.topics().createPartitionedTopicAsync(fullName, this.defaultNumPartitions).whenComplete((r11, th) -> {
                                    if (th == null) {
                                        biConsumer.accept(str4, Integer.valueOf(this.defaultNumPartitions));
                                    } else {
                                        log.error("[{}] Failed to create partitioned topic {}", this.ctx.channel(), str4, th);
                                        runnable.run();
                                    }
                                });
                            } else {
                                log.error("[{}] Request {}: Topic {} doesn't exist and it's not allowed to auto create partitioned topic", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str4);
                                synchronizedList.add(new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, str4, isInternalTopic(fullName), Collections.emptyList()));
                                runnable.run();
                            }
                        });
                    } else {
                        biConsumer2.accept(str4, fullName);
                    }
                });
            });
        }
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        int id = newSelfNode().id();
        thenApply.whenComplete((map2, th) -> {
            if (th != null) {
                log.warn("[{}] Request {}: Exception fetching metadata, will return null Response", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), th);
                synchronizedList2.add(newSelfNode());
                completableFuture.complete(new MetadataResponse(synchronizedList2, this.clusterName, id, Collections.emptyList()));
            } else {
                int size2 = map2.size();
                if (size2 != 0) {
                    map2.forEach((str5, list3) -> {
                        int size3 = list3.size();
                        AtomicInteger atomicInteger3 = new AtomicInteger(0);
                        List synchronizedList3 = Collections.synchronizedList(Lists.newArrayListWithExpectedSize(size3));
                        list3.forEach(topicName -> {
                            TopicName topicName = (TopicName) newConcurrentMap.getOrDefault(topicName.toString(), topicName);
                            findBroker(topicName).whenComplete((partitionMetadata, th) -> {
                                if (th != null || partitionMetadata == null) {
                                    log.warn("[{}] Request {}: Exception while find Broker metadata", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), th);
                                    synchronizedList3.add(newFailedPartitionMetadata(topicName));
                                } else {
                                    Node leader = partitionMetadata.leader();
                                    synchronized (synchronizedList2) {
                                        if (!synchronizedList2.stream().anyMatch(node -> {
                                            return node.equals(leader);
                                        })) {
                                            synchronizedList2.add(leader);
                                        }
                                    }
                                    synchronizedList3.add(partitionMetadata);
                                }
                                int incrementAndGet = atomicInteger3.incrementAndGet();
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] Request {}: FindBroker for topic {}, partitions found/all: {}/{}.", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str5, Integer.valueOf(incrementAndGet), Integer.valueOf(size3));
                                }
                                if (incrementAndGet == size3) {
                                    synchronizedList.add(new MetadataResponse.TopicMetadata(Errors.NONE, str5, isInternalTopic(new KopTopic(str5).getFullName()), synchronizedList3));
                                    int incrementAndGet2 = atomicInteger2.incrementAndGet();
                                    if (log.isDebugEnabled()) {
                                        log.debug("[{}] Request {}: Completed findBroker for topic {}, partitions found/all: {}/{}. \n dump All Metadata:", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str5, Integer.valueOf(incrementAndGet2), Integer.valueOf(size2));
                                        synchronizedList.stream().forEach(topicMetadata -> {
                                            log.debug("TopicMetadata response: {}", topicMetadata.toString());
                                        });
                                    }
                                    if (incrementAndGet2 == size2) {
                                        completableFuture.complete(new MetadataResponse(synchronizedList2, this.clusterName, id, synchronizedList));
                                    }
                                }
                            });
                        });
                    });
                } else {
                    synchronizedList2.add(newSelfNode());
                    completableFuture.complete(new MetadataResponse(synchronizedList2, this.clusterName, id, synchronizedList));
                }
            }
        });
    }

    private void disableCnxAutoRead() {
        if (this.ctx == null || !this.ctx.channel().config().isAutoRead()) {
            return;
        }
        this.ctx.channel().config().setAutoRead(false);
        if (log.isDebugEnabled()) {
            log.debug("[{}] disable auto read", this.ctx.channel());
        }
    }

    private void enableCnxAutoRead() {
        if (this.ctx == null || this.ctx.channel().config().isAutoRead() || this.autoReadDisabledPublishBufferLimiting) {
            return;
        }
        this.ctx.channel().config().setAutoRead(true);
        this.ctx.read();
        if (log.isDebugEnabled()) {
            log.debug("[{}] enable auto read", this.ctx.channel());
        }
    }

    private void startSendOperationForThrottling(long j) {
        long addAndGet = this.pendingBytes.addAndGet(j);
        if (addAndGet < this.maxPendingBytes || this.autoReadDisabledPublishBufferLimiting || this.maxPendingBytes <= 0) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] disable auto read because currentPendingBytes({}) > maxPendingBytes({})", this.ctx.channel(), Long.valueOf(addAndGet), Long.valueOf(this.maxPendingBytes));
        }
        disableCnxAutoRead();
        this.autoReadDisabledPublishBufferLimiting = true;
        this.pulsarService.getBrokerService().pausedConnections(1);
    }

    private void completeSendOperationForThrottling(long j) {
        long addAndGet = this.pendingBytes.addAndGet(-j);
        if (addAndGet >= this.resumeThresholdPendingBytes || !this.autoReadDisabledPublishBufferLimiting) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] enable auto read because currentPendingBytes({}) < resumeThreshold({})", this.ctx.channel(), Long.valueOf(addAndGet), Long.valueOf(this.resumeThresholdPendingBytes));
        }
        this.autoReadDisabledPublishBufferLimiting = false;
        enableCnxAutoRead();
        this.pulsarService.getBrokerService().resumedConnections(1);
    }

    private void publishMessages(Optional<PersistentTopic> optional, ByteBuf byteBuf, int i, MemoryRecords memoryRecords, TopicPartition topicPartition, Consumer<Long> consumer, Consumer<Errors> consumer2) {
        if (!optional.isPresent()) {
            consumer2.accept(Errors.NOT_LEADER_FOR_PARTITION);
            return;
        }
        PersistentTopic persistentTopic = optional.get();
        if (persistentTopic.isSystemTopic()) {
            log.error("Not support producing message to system topic: {}", persistentTopic);
            consumer2.accept(Errors.INVALID_TOPIC_EXCEPTION);
            return;
        }
        String kopTopic = KopTopic.toString(topicPartition);
        this.topicManager.registerProducerInPersistentTopic(kopTopic, persistentTopic);
        Producer referenceProducer = KafkaTopicManager.getReferenceProducer(kopTopic);
        referenceProducer.updateRates(i, byteBuf.readableBytes());
        referenceProducer.getTopic().incrementPublishCount(i, byteBuf.readableBytes());
        updateProducerStats(topicPartition, i, byteBuf.readableBytes());
        CompletableFuture completableFuture = new CompletableFuture();
        long nowInNano = MathUtils.nowInNano();
        persistentTopic.publishMessage(byteBuf, MessagePublishContext.get(completableFuture, persistentTopic, i, System.nanoTime()));
        MutableRecordBatch next = memoryRecords.batchIterator().next();
        completableFuture.whenComplete((l, th) -> {
            completeSendOperationForThrottling(byteBuf.readableBytes());
            byteBuf.release();
            if (th != null) {
                log.error("publishMessages for topic partition: {} failed when write.", kopTopic, th);
                this.requestStats.getMessagePublishStats().registerFailedEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
                consumer2.accept(Errors.KAFKA_STORAGE_ERROR);
            } else {
                if (next.isTransactional()) {
                    this.transactionCoordinator.addActivePidOffset(TopicName.get(kopTopic), next.producerId(), l.longValue());
                }
                this.requestStats.getMessagePublishStats().registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
                consumer.accept(l);
            }
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleProduceRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof ProduceRequest);
        ProduceRequest produceRequest = (ProduceRequest) kafkaHeaderAndRequest.getRequest();
        int size = produceRequest.partitionRecordsOrFail().size();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicInteger atomicInteger = new AtomicInteger(produceRequest.partitionRecordsOrFail().size());
        int timeout = produceRequest.timeout();
        Runnable runnable = () -> {
            atomicInteger.set(0);
            if (completableFuture.isDone()) {
                return;
            }
            produceRequest.partitionRecordsOrFail().keySet().forEach(topicPartition -> {
                if (concurrentHashMap.containsKey(topicPartition)) {
                    return;
                }
                concurrentHashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT));
            });
            if (log.isDebugEnabled()) {
                log.debug("[{}] Request {}: Complete handle produce.", this.ctx.channel(), kafkaHeaderAndRequest.toString());
            }
            completableFuture.complete(new ProduceResponse((Map<TopicPartition, ProduceResponse.PartitionResponse>) concurrentHashMap));
        };
        BiConsumer biConsumer = (topicPartition, partitionResponse) -> {
            concurrentHashMap.put(topicPartition, partitionResponse);
            int decrementAndGet = atomicInteger.decrementAndGet();
            if (decrementAndGet >= 0 && decrementAndGet == 0) {
                runnable.run();
            }
        };
        produceRequest.partitionRecordsOrFail().forEach((topicPartition2, memoryRecords) -> {
            Consumer consumer = l -> {
                biConsumer.accept(topicPartition2, new ProduceResponse.PartitionResponse(Errors.NONE, l.longValue(), -1L, -1L));
            };
            Consumer consumer2 = errors -> {
                biConsumer.accept(topicPartition2, new ProduceResponse.PartitionResponse(errors));
            };
            Consumer consumer3 = th -> {
                biConsumer.accept(topicPartition2, new ProduceResponse.PartitionResponse(Errors.forException(th)));
            };
            String kopTopic = KopTopic.toString(topicPartition2);
            authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, kopTopic)).whenComplete((bool, th2) -> {
                if (th2 != null) {
                    log.error("Write topic authorize failed, topic - {}. {}", kopTopic, th2.getMessage());
                    consumer2.accept(Errors.TOPIC_AUTHORIZATION_FAILED);
                } else if (bool.booleanValue()) {
                    handlePartitionRecords(kafkaHeaderAndRequest, topicPartition2, memoryRecords, size, kopTopic, consumer, consumer2, consumer3);
                } else {
                    consumer2.accept(Errors.TOPIC_AUTHORIZATION_FAILED);
                }
            });
        });
        if (timeout <= 0) {
            runnable.run();
            return;
        }
        this.producePurgatory.tryCompleteElseWatch(new DelayedProduceAndFetch(timeout, atomicInteger, runnable), (List) produceRequest.partitionRecordsOrFail().keySet().stream().map(DelayedOperationKey.TopicPartitionOperationKey::new).collect(Collectors.toList()));
    }

    private void handlePartitionRecords(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, TopicPartition topicPartition, MemoryRecords memoryRecords, int i, String str, Consumer<Long> consumer, Consumer<Errors> consumer2, Consumer<Throwable> consumer3) {
        if (isOffsetTopic(str) || isTransactionTopic(str)) {
            log.error("[{}] Request {}: not support produce message to inner topic. topic: {}", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), topicPartition);
            consumer2.accept(Errors.INVALID_TOPIC_EXCEPTION);
            return;
        }
        try {
            long nowInNano = MathUtils.nowInNano();
            MemoryRecords validateRecords = validateRecords(kafkaHeaderAndRequest.getHeader().apiVersion(), topicPartition, memoryRecords);
            int parseNumMessages = EntryFormatter.parseNumMessages(validateRecords);
            ByteBuf encode = this.entryFormatter.encode(validateRecords, parseNumMessages);
            this.requestStats.getProduceEncodeStats().registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
            startSendOperationForThrottling(encode.readableBytes());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} ", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Integer.valueOf(i));
            }
            CompletableFuture<Optional<PersistentTopic>> topic = this.topicManager.getTopic(str);
            if (topic.isCompletedExceptionally()) {
                topic.exceptionally(th -> {
                    consumer3.accept(th);
                    return Optional.empty();
                });
                return;
            }
            if (topic.isDone() && !topic.getNow(Optional.empty()).isPresent()) {
                consumer2.accept(Errors.NOT_LEADER_FOR_PARTITION);
                return;
            }
            Consumer<Optional<PersistentTopic>> consumer4 = optional -> {
                publishMessages(optional, encode, parseNumMessages, validateRecords, topicPartition, consumer, consumer2);
            };
            if (topic.isDone()) {
                consumer4.accept(topic.getNow(Optional.empty()));
            } else {
                this.pendingTopicFuturesMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                    return new PendingTopicFutures(this.requestStats);
                }).addListener(topic, consumer4, consumer3);
            }
        } catch (Exception e) {
            log.error("[{}] Failed to handle produce request for {}", this.ctx.channel(), topicPartition, e);
            consumer3.accept(e);
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleFindCoordinatorRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        int partitionFor;
        String topicPartitionName;
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof FindCoordinatorRequest);
        FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) kafkaHeaderAndRequest.getRequest();
        if (findCoordinatorRequest.coordinatorType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION) {
            partitionFor = this.transactionCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey());
            topicPartitionName = this.transactionCoordinator.getTopicPartitionName(partitionFor);
        } else {
            if (findCoordinatorRequest.coordinatorType() != FindCoordinatorRequest.CoordinatorType.GROUP) {
                throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type " + findCoordinatorRequest.coordinatorType());
            }
            partitionFor = this.groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey());
            topicPartitionName = this.groupCoordinator.getTopicPartitionName(partitionFor);
        }
        String coordinatorKey = findCoordinatorRequest.coordinatorKey();
        ZooKeeperUtils.tryCreatePath(this.pulsarService.getZkClient(), this.groupIdStoredPath + ZooKeeperUtils.groupIdPathFormat(kafkaHeaderAndRequest.getClientHost(), kafkaHeaderAndRequest.getHeader().clientId()), coordinatorKey.getBytes(Charset.forName(CharEncoding.UTF_8)));
        int i = partitionFor;
        findBroker(TopicName.get(topicPartitionName)).whenComplete((partitionMetadata, th) -> {
            if (th != null || partitionMetadata == null) {
                log.error("[{}] Request {}: Error while find coordinator, .", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), th);
                completableFuture.complete(new FindCoordinatorResponse(Errors.LEADER_NOT_AVAILABLE, Node.noNode()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found node {} as coordinator for key {} partition {}.", this.ctx.channel(), partitionMetadata.leader(), findCoordinatorRequest.coordinatorKey(), Integer.valueOf(i));
                }
                completableFuture.complete(new FindCoordinatorResponse(Errors.NONE, partitionMetadata.leader()));
            }
        });
    }

    private <T> void replaceTopicPartition(Map<TopicPartition, T> map, Map<TopicPartition, TopicPartition> map2) {
        HashMap hashMap = new HashMap();
        map.entrySet().removeIf(entry -> {
            if (!map2.containsKey(entry.getKey())) {
                return false;
            }
            hashMap.put(map2.get(entry.getKey()), entry.getValue());
            return true;
        });
        map.putAll(hashMap);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleOffsetFetchRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof OffsetFetchRequest);
        OffsetFetchRequest offsetFetchRequest = (OffsetFetchRequest) kafkaHeaderAndRequest.getRequest();
        Preconditions.checkState(this.groupCoordinator != null, "Group Coordinator not started");
        CompletableFuture completableFuture2 = new CompletableFuture();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        ConcurrentMap newConcurrentMap2 = Maps.newConcurrentMap();
        if (offsetFetchRequest.partitions() == null || offsetFetchRequest.partitions().isEmpty()) {
            completableFuture2.complete(null);
        } else {
            AtomicInteger atomicInteger = new AtomicInteger(offsetFetchRequest.partitions().size());
            Runnable runnable = () -> {
                if (atomicInteger.decrementAndGet() == 0) {
                    completableFuture2.complete(arrayList);
                }
            };
            offsetFetchRequest.partitions().forEach(topicPartition -> {
                try {
                    String fullName = new KopTopic(topicPartition.topic()).getFullName();
                    authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullName)).whenComplete((bool, th) -> {
                        if (th != null) {
                            log.error("Describe topic authorize failed, topic - {}. {}", fullName, th.getMessage());
                            newConcurrentMap.put(topicPartition, OffsetFetchResponse.UNAUTHORIZED_PARTITION);
                            runnable.run();
                        } else if (!bool.booleanValue()) {
                            newConcurrentMap.put(topicPartition, OffsetFetchResponse.UNAUTHORIZED_PARTITION);
                            runnable.run();
                        } else {
                            TopicPartition topicPartition = new TopicPartition(fullName, topicPartition.partition());
                            hashMap.put(topicPartition, topicPartition);
                            arrayList.add(topicPartition);
                            runnable.run();
                        }
                    });
                } catch (KoPTopicException e) {
                    log.warn("Invalid topic name: {}", topicPartition.topic(), e);
                    newConcurrentMap2.put(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION);
                }
            });
        }
        completableFuture2.whenComplete((list, th) -> {
            KeyValue<Errors, Map<TopicPartition, OffsetFetchResponse.PartitionData>> handleFetchOffsets = this.groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId(), Optional.ofNullable(list));
            if (log.isDebugEnabled()) {
                log.debug("OFFSET_FETCH Unknown partitions: {}, Unauthorized partitions: {}.", newConcurrentMap2, newConcurrentMap);
            }
            if (log.isTraceEnabled()) {
                StringBuffer stringBuffer = new StringBuffer();
                hashMap.forEach((topicPartition2, topicPartition3) -> {
                    stringBuffer.append(String.format("\tinnerName:%s, outerName:%s%n", topicPartition2, topicPartition3));
                });
                log.trace("OFFSET_FETCH TopicPartition relations: \n{}", stringBuffer);
            }
            replaceTopicPartition((Map) handleFetchOffsets.getValue(), hashMap);
            ((Map) handleFetchOffsets.getValue()).putAll(newConcurrentMap);
            ((Map) handleFetchOffsets.getValue()).putAll(newConcurrentMap2);
            completableFuture.complete(new OffsetFetchResponse((Errors) handleFetchOffsets.getKey(), (Map) handleFetchOffsets.getValue()));
        });
    }

    private CompletableFuture<ListOffsetResponse.PartitionData> fetchOffsetForTimestamp(String str, Long l, boolean z) {
        CompletableFuture<ListOffsetResponse.PartitionData> completableFuture = new CompletableFuture<>();
        this.topicManager.getTopic(str).whenComplete((optional, th) -> {
            if (th != null) {
                Logger logger = log;
                Object[] objArr = new Object[3];
                objArr[0] = !optional.isPresent() ? "null" : ((PersistentTopic) optional.get()).getName();
                objArr[1] = l;
                objArr[2] = th;
                logger.error("Failed while get persistentTopic topic: {} ts: {}. ", objArr);
                completableFuture.complete(new ListOffsetResponse.PartitionData(Errors.forException(th), -1L, -1L));
                return;
            }
            if (!optional.isPresent()) {
                completableFuture.complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1L, -1L));
                return;
            }
            final PersistentTopic persistentTopic = (PersistentTopic) optional.get();
            final ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
            final PositionImpl lastConfirmedEntry = managedLedger.getLastConfirmedEntry();
            if (l.longValue() == -1) {
                PositionImpl lastConfirmedEntry2 = managedLedger.getLastConfirmedEntry();
                if (log.isDebugEnabled()) {
                    log.debug("Get latest position for topic {} time {}. result: {}", persistentTopic.getName(), l, lastConfirmedEntry2);
                }
                fetchOffsetForTimestampSuccess(completableFuture, z, MessageIdUtils.getLogEndOffset(managedLedger));
                return;
            }
            if (l.longValue() != -2) {
                new OffsetFinder(managedLedger).findMessages(l.longValue(), new AsyncCallbacks.FindEntryCallback() { // from class: io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.1
                    public void findEntryComplete(Position position, Object obj) {
                        PositionImpl positionImpl;
                        if (position == null) {
                            positionImpl = OffsetFinder.getFirstValidPosition(managedLedger);
                            if (positionImpl == null) {
                                KafkaRequestHandler.log.warn("Unable to find position for topic {} time {}. get NULL position", persistentTopic.getName(), l);
                                KafkaRequestHandler.this.fetchOffsetForTimestampFailed(completableFuture, z);
                                return;
                            }
                        } else {
                            positionImpl = (PositionImpl) position;
                        }
                        if (KafkaRequestHandler.log.isDebugEnabled()) {
                            KafkaRequestHandler.log.debug("Find position for topic {} time {}. position: {}", persistentTopic.getName(), l, positionImpl);
                        }
                        if (positionImpl.compareTo(lastConfirmedEntry) > 0 || MessageIdUtils.getCurrentOffset(managedLedger) < 0) {
                            KafkaRequestHandler.this.fetchOffsetForTimestampSuccess(completableFuture, z, Math.max(0L, MessageIdUtils.getCurrentOffset(managedLedger)));
                            return;
                        }
                        CompletableFuture<Long> offsetOfPosition = MessageIdUtils.getOffsetOfPosition(managedLedger, positionImpl, true, l.longValue());
                        PersistentTopic persistentTopic2 = persistentTopic;
                        PositionImpl positionImpl2 = positionImpl;
                        CompletableFuture completableFuture2 = completableFuture;
                        boolean z2 = z;
                        offsetOfPosition.whenComplete((l2, th) -> {
                            if (th == null) {
                                KafkaRequestHandler.this.fetchOffsetForTimestampSuccess(completableFuture2, z2, l2.longValue());
                            } else {
                                KafkaRequestHandler.log.error("[{}] Failed to get offset for position {}", persistentTopic2, positionImpl2, th);
                                KafkaRequestHandler.this.fetchOffsetForTimestampFailed(completableFuture2, z2);
                            }
                        });
                    }

                    public void findEntryFailed(ManagedLedgerException managedLedgerException, Optional<Position> optional, Object obj) {
                        KafkaRequestHandler.log.warn("Unable to find position for topic {} time {}. Exception:", persistentTopic.getName(), l, managedLedgerException);
                        KafkaRequestHandler.this.fetchOffsetForTimestampFailed(completableFuture, z);
                    }
                });
                return;
            }
            PositionImpl firstValidPosition = OffsetFinder.getFirstValidPosition(managedLedger);
            if (log.isDebugEnabled()) {
                log.debug("Get earliest position for topic {} time {}. result: {}", persistentTopic.getName(), l, firstValidPosition);
            }
            if (firstValidPosition.compareTo(lastConfirmedEntry) > 0 || MessageIdUtils.getCurrentOffset(managedLedger) < 0) {
                fetchOffsetForTimestampSuccess(completableFuture, z, Math.max(0L, MessageIdUtils.getCurrentOffset(managedLedger)));
            } else {
                MessageIdUtils.getOffsetOfPosition(managedLedger, firstValidPosition, false, l.longValue()).whenComplete((l2, th) -> {
                    if (th == null) {
                        fetchOffsetForTimestampSuccess(completableFuture, z, l2.longValue());
                    } else {
                        log.error("[{}] Failed to get offset for position {}", persistentTopic, firstValidPosition, th);
                        fetchOffsetForTimestampFailed(completableFuture, z);
                    }
                });
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchOffsetForTimestampFailed(CompletableFuture<ListOffsetResponse.PartitionData> completableFuture, boolean z) {
        if (z) {
            completableFuture.complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, Collections.emptyList()));
        } else {
            completableFuture.complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, -1L, -1L));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchOffsetForTimestampSuccess(CompletableFuture<ListOffsetResponse.PartitionData> completableFuture, boolean z, long j) {
        if (z) {
            completableFuture.complete(new ListOffsetResponse.PartitionData(Errors.NONE, Collections.singletonList(Long.valueOf(j))));
        } else {
            completableFuture.complete(new ListOffsetResponse.PartitionData(Errors.NONE, 0L, j));
        }
    }

    private void handleListOffsetRequestV1AndAbove(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        ListOffsetRequest listOffsetRequest = (ListOffsetRequest) kafkaHeaderAndRequest.getRequest();
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        listOffsetRequest.partitionTimestamps().forEach((topicPartition, l) -> {
            String kopTopic = KopTopic.toString(topicPartition);
            authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, kopTopic)).whenComplete((bool, th) -> {
                if (th != null) {
                    log.error("Describe topic authorize failed, topic - {}. {}", kopTopic, th.getMessage());
                    newConcurrentMap.put(topicPartition, CompletableFuture.completedFuture(new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Collections.emptyList())));
                } else if (bool.booleanValue()) {
                    newConcurrentMap.put(topicPartition, fetchOffsetForTimestamp(kopTopic, l, false));
                } else {
                    newConcurrentMap.put(topicPartition, CompletableFuture.completedFuture(new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Collections.emptyList())));
                }
            });
        });
        CompletableFuture.allOf((CompletableFuture[]) newConcurrentMap.values().toArray(new CompletableFuture[0])).whenComplete((r7, th) -> {
            completableFuture.complete(new ListOffsetResponse((Map<TopicPartition, ListOffsetResponse.PartitionData>) CoreUtils.mapValue(newConcurrentMap, (v0) -> {
                return v0.join();
            })));
        });
    }

    private void handleListOffsetRequestV0(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        ListOffsetRequest listOffsetRequest = (ListOffsetRequest) kafkaHeaderAndRequest.getRequest();
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        if (log.isDebugEnabled()) {
            log.debug("received a v0 listOffset: {}", listOffsetRequest.toString(true));
        }
        listOffsetRequest.offsetData().forEach((topicPartition, partitionData) -> {
            String kopTopic = KopTopic.toString(topicPartition);
            authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, kopTopic)).whenComplete((bool, th) -> {
                if (th != null) {
                    log.error("Describe topic authorize failed, topic - {}. {}", kopTopic, th.getMessage());
                    newConcurrentMap.put(topicPartition, CompletableFuture.completedFuture(new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Collections.emptyList())));
                } else {
                    if (!bool.booleanValue()) {
                        newConcurrentMap.put(topicPartition, CompletableFuture.completedFuture(new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Collections.emptyList())));
                        return;
                    }
                    Long valueOf = Long.valueOf(partitionData.timestamp);
                    if (partitionData.maxNumOffsets > 1) {
                        log.warn("request is asking for multiples offsets for {}, not supported for now", kopTopic);
                        new CompletableFuture().complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, Collections.singletonList(-1L)));
                    }
                    newConcurrentMap.put(topicPartition, fetchOffsetForTimestamp(kopTopic, valueOf, true));
                }
            });
        });
        CompletableFuture.allOf((CompletableFuture[]) newConcurrentMap.values().toArray(new CompletableFuture[0])).whenComplete((r7, th) -> {
            completableFuture.complete(new ListOffsetResponse((Map<TopicPartition, ListOffsetResponse.PartitionData>) CoreUtils.mapValue(newConcurrentMap, (v0) -> {
                return v0.join();
            })));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleListOffsetRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof ListOffsetRequest);
        if (kafkaHeaderAndRequest.getHeader().apiVersion() == 0) {
            handleListOffsetRequestV0(kafkaHeaderAndRequest, completableFuture);
        } else {
            handleListOffsetRequestV1AndAbove(kafkaHeaderAndRequest, completableFuture);
        }
    }

    private Map<TopicPartition, Errors> nonExistingTopicErrors(OffsetCommitRequest offsetCommitRequest) {
        return Maps.newHashMap();
    }

    private Map<TopicPartition, Errors> nonExistingTopicErrors() {
        return Maps.newHashMap();
    }

    @VisibleForTesting
    Map<TopicPartition, OffsetAndMetadata> convertOffsetCommitRequestRetentionMs(OffsetCommitRequest offsetCommitRequest, short s, long j, long j2) {
        long retentionTime = (s <= 1 || offsetCommitRequest.retentionTime() == -1) ? j2 : offsetCommitRequest.retentionTime();
        long j3 = retentionTime + j;
        long j4 = retentionTime;
        return CoreUtils.mapValue(offsetCommitRequest.offsetData(), partitionData -> {
            return OffsetAndMetadata.apply(partitionData.offset, partitionData.metadata == null ? "" : partitionData.metadata, j, partitionData.timestamp == -1 ? j3 : j4 + partitionData.timestamp);
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleOffsetCommitRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof OffsetCommitRequest);
        Preconditions.checkState(this.groupCoordinator != null, "Group Coordinator not started");
        OffsetCommitRequest offsetCommitRequest = (OffsetCommitRequest) kafkaHeaderAndRequest.getRequest();
        Map<TopicPartition, Errors> nonExistingTopicErrors = nonExistingTopicErrors(offsetCommitRequest);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        offsetCommitRequest.offsetData().entrySet().removeIf(entry -> {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            try {
                TopicPartition topicPartition2 = new TopicPartition(new KopTopic(topicPartition.topic()).getFullName(), topicPartition.partition());
                hashMap.put(topicPartition2, entry.getValue());
                hashMap2.put(topicPartition2, topicPartition);
                return true;
            } catch (KoPTopicException e) {
                log.warn("Invalid topic name: {}", topicPartition.topic(), e);
                nonExistingTopicErrors.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION);
                return true;
            }
        });
        if (log.isTraceEnabled()) {
            StringBuffer stringBuffer = new StringBuffer();
            hashMap2.forEach((topicPartition, topicPartition2) -> {
                stringBuffer.append(String.format("\tinnerName:%s, outerName:%s%n", topicPartition, topicPartition2));
            });
            log.trace("OFFSET_COMMIT TopicPartition relations: \n{}", stringBuffer.toString());
        }
        offsetCommitRequest.offsetData().putAll(hashMap);
        if (!offsetCommitRequest.offsetData().isEmpty()) {
            this.groupCoordinator.handleCommitOffsets(offsetCommitRequest.groupId(), offsetCommitRequest.memberId(), offsetCommitRequest.generationId(), convertOffsetCommitRequestRetentionMs(offsetCommitRequest, kafkaHeaderAndRequest.getHeader().apiVersion(), Time.SYSTEM.milliseconds(), this.groupCoordinator.offsetConfig().offsetsRetentionMs())).thenAccept(map -> {
                replaceTopicPartition(map, hashMap2);
                if (!nonExistingTopicErrors.isEmpty()) {
                    map.putAll(nonExistingTopicErrors);
                }
                completableFuture.complete(new OffsetCommitResponse((Map<TopicPartition, Errors>) map));
            });
            return;
        }
        HashMap hashMap3 = new HashMap();
        if (!nonExistingTopicErrors.isEmpty()) {
            hashMap3.putAll(nonExistingTopicErrors);
        }
        completableFuture.complete(new OffsetCommitResponse(hashMap3));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleFetchRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof FetchRequest);
        FetchRequest fetchRequest = (FetchRequest) kafkaHeaderAndRequest.getRequest();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ", this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), Integer.valueOf(fetchRequest.fetchData().size()));
            fetchRequest.fetchData().forEach((topicPartition, partitionData) -> {
                log.debug("  Fetch request topic:{} data:{}.", topicPartition, partitionData.toString());
            });
        }
        MessageFetchContext.get(this, kafkaHeaderAndRequest, completableFuture, this.fetchPurgatory).handleFetch();
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleJoinGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof JoinGroupRequest);
        Preconditions.checkState(this.groupCoordinator != null, "Group Coordinator not started");
        JoinGroupRequest joinGroupRequest = (JoinGroupRequest) kafkaHeaderAndRequest.getRequest();
        HashMap hashMap = new HashMap();
        joinGroupRequest.groupProtocols().stream().forEach(protocolMetadata -> {
        });
        this.groupCoordinator.handleJoinGroup(joinGroupRequest.groupId(), joinGroupRequest.memberId(), kafkaHeaderAndRequest.getHeader().clientId(), kafkaHeaderAndRequest.getClientHost(), joinGroupRequest.rebalanceTimeout(), joinGroupRequest.sessionTimeout(), joinGroupRequest.protocolType(), hashMap).thenAccept(joinGroupResult -> {
            HashMap hashMap2 = new HashMap();
            joinGroupResult.getMembers().forEach((str, bArr) -> {
            });
            JoinGroupResponse joinGroupResponse = new JoinGroupResponse(joinGroupResult.getError(), joinGroupResult.getGenerationId(), joinGroupResult.getSubProtocol(), joinGroupResult.getMemberId(), joinGroupResult.getLeaderId(), hashMap2);
            if (log.isTraceEnabled()) {
                log.trace("Sending join group response {} for correlation id {} to client {}.", joinGroupResponse, Integer.valueOf(kafkaHeaderAndRequest.getHeader().correlationId()), kafkaHeaderAndRequest.getHeader().clientId());
            }
            completableFuture.complete(joinGroupResponse);
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleSyncGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof SyncGroupRequest);
        SyncGroupRequest syncGroupRequest = (SyncGroupRequest) kafkaHeaderAndRequest.getRequest();
        this.groupIds.add(syncGroupRequest.groupId());
        this.groupCoordinator.handleSyncGroup(syncGroupRequest.groupId(), syncGroupRequest.generationId(), syncGroupRequest.memberId(), CoreUtils.mapValue(syncGroupRequest.groupAssignment(), Utils::toArray)).thenAccept(keyValue -> {
            completableFuture.complete(new SyncGroupResponse((Errors) keyValue.getKey(), ByteBuffer.wrap((byte[]) keyValue.getValue())));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleHeartbeatRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof HeartbeatRequest);
        HeartbeatRequest heartbeatRequest = (HeartbeatRequest) kafkaHeaderAndRequest.getRequest();
        this.groupCoordinator.handleHeartbeat(heartbeatRequest.groupId(), heartbeatRequest.memberId(), heartbeatRequest.groupGenerationId()).thenAccept(errors -> {
            HeartbeatResponse heartbeatResponse = new HeartbeatResponse(errors);
            if (log.isTraceEnabled()) {
                log.trace("Sending heartbeat response {} for correlation id {} to client {}.", heartbeatResponse, Integer.valueOf(kafkaHeaderAndRequest.getHeader().correlationId()), kafkaHeaderAndRequest.getHeader().clientId());
            }
            completableFuture.complete(heartbeatResponse);
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleLeaveGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof LeaveGroupRequest);
        LeaveGroupRequest leaveGroupRequest = (LeaveGroupRequest) kafkaHeaderAndRequest.getRequest();
        this.groupCoordinator.handleLeaveGroup(leaveGroupRequest.groupId(), leaveGroupRequest.memberId()).thenAccept(errors -> {
            completableFuture.complete(new LeaveGroupResponse(errors));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleDescribeGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof DescribeGroupsRequest);
        completableFuture.complete(new DescribeGroupsResponse((Map<String, DescribeGroupsResponse.GroupMetadata>) ((DescribeGroupsRequest) kafkaHeaderAndRequest.getRequest()).groupIds().stream().map(str -> {
            KeyValue<Errors, GroupMetadata.GroupSummary> handleDescribeGroup = this.groupCoordinator.handleDescribeGroup(str);
            GroupMetadata.GroupSummary groupSummary = (GroupMetadata.GroupSummary) handleDescribeGroup.getValue();
            return new KeyValue(str, new DescribeGroupsResponse.GroupMetadata((Errors) handleDescribeGroup.getKey(), groupSummary.state(), groupSummary.protocolType(), groupSummary.protocol(), (List) groupSummary.members().stream().map(memberSummary -> {
                return new DescribeGroupsResponse.GroupMember(memberSummary.memberId(), memberSummary.clientId(), memberSummary.clientHost(), ByteBuffer.wrap(memberSummary.metadata()), ByteBuffer.wrap(memberSummary.assignment()));
            }).collect(Collectors.toList())));
        }).collect(Collectors.toMap(keyValue -> {
            return (String) keyValue.getKey();
        }, keyValue2 -> {
            return (DescribeGroupsResponse.GroupMetadata) keyValue2.getValue();
        }))));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleListGroupsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof ListGroupsRequest);
        KeyValue<Errors, List<GroupMetadata.GroupOverview>> handleListGroups = this.groupCoordinator.handleListGroups();
        completableFuture.complete(new ListGroupsResponse((Errors) handleListGroups.getKey(), (List) ((List) handleListGroups.getValue()).stream().map(groupOverview -> {
            return new ListGroupsResponse.Group(groupOverview.groupId(), groupOverview.protocolType());
        }).collect(Collectors.toList())));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleDeleteGroupsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof DeleteGroupsRequest);
        completableFuture.complete(new DeleteGroupsResponse(this.groupCoordinator.handleDeleteGroups(((DeleteGroupsRequest) kafkaHeaderAndRequest.getRequest()).groups())));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleSaslAuthenticate(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        completableFuture.complete(new SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE, "SaslAuthenticate request received after successful authentication"));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleSaslHandshake(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        completableFuture.complete(new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet()));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleCreateTopics(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof CreateTopicsRequest);
        CreateTopicsRequest createTopicsRequest = (CreateTopicsRequest) kafkaHeaderAndRequest.getRequest();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Set<String> duplicateTopics = createTopicsRequest.duplicateTopics();
        createTopicsRequest.topics().forEach((str, topicDetails) -> {
            if (!duplicateTopics.contains(str)) {
                hashMap2.put(str, topicDetails);
            } else {
                hashMap.put(str, new ApiError(Errors.INVALID_REQUEST, "Create topics request from client `" + kafkaHeaderAndRequest.getHeader().clientId() + "` contains multiple entries for the following topics: " + duplicateTopics));
            }
        });
        if (hashMap2.isEmpty()) {
            completableFuture.complete(new CreateTopicsResponse(hashMap));
        } else {
            this.adminManager.createTopicsAsync(hashMap2, createTopicsRequest.timeout()).thenApply(map -> {
                hashMap.putAll(map);
                completableFuture.complete(new CreateTopicsResponse((Map<String, ApiError>) hashMap));
                return null;
            });
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleDescribeConfigs(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof DescribeConfigsRequest);
        DescribeConfigsRequest describeConfigsRequest = (DescribeConfigsRequest) kafkaHeaderAndRequest.getRequest();
        this.adminManager.describeConfigsAsync((Map) new ArrayList(describeConfigsRequest.resources()).stream().collect(Collectors.toMap(configResource -> {
            return configResource;
        }, configResource2 -> {
            return Optional.ofNullable(describeConfigsRequest.configNames(configResource2)).map(HashSet::new);
        }))).thenApply(map -> {
            completableFuture.complete(new DescribeConfigsResponse(0, map));
            return null;
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleInitProducerId(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) kafkaHeaderAndRequest.getRequest();
        this.transactionCoordinator.handleInitProducerId(initProducerIdRequest.transactionalId(), initProducerIdRequest.transactionTimeoutMs(), Optional.empty(), this, completableFuture);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleAddPartitionsToTxn(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) kafkaHeaderAndRequest.getRequest();
        this.transactionCoordinator.handleAddPartitionsToTransaction(addPartitionsToTxnRequest.transactionalId(), addPartitionsToTxnRequest.producerId(), addPartitionsToTxnRequest.producerEpoch(), addPartitionsToTxnRequest.partitions(), completableFuture);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleAddOffsetsToTxn(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) kafkaHeaderAndRequest.getRequest();
        int partitionFor = this.groupCoordinator.partitionFor(addOffsetsToTxnRequest.consumerGroupId());
        this.transactionCoordinator.handleAddPartitionsToTransaction(addOffsetsToTxnRequest.transactionalId(), addOffsetsToTxnRequest.producerId(), addOffsetsToTxnRequest.producerEpoch(), Collections.singletonList(new TopicPartition(this.groupCoordinator.getGroupManager().getOffsetConfig().offsetsTopicName(), partitionFor)), completableFuture);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleTxnOffsetCommit(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) kafkaHeaderAndRequest.getRequest();
        Map<TopicPartition, Errors> nonExistingTopicErrors = nonExistingTopicErrors();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        txnOffsetCommitRequest.offsets().entrySet().removeIf(entry -> {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            try {
                TopicPartition topicPartition2 = new TopicPartition(new KopTopic(topicPartition.topic()).getFullName(), topicPartition.partition());
                hashMap.put(topicPartition2, entry.getValue());
                hashMap2.put(topicPartition2, topicPartition);
                return true;
            } catch (KoPTopicException e) {
                log.warn("Invalid topic name: {}", topicPartition.topic(), e);
                nonExistingTopicErrors.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION);
                return true;
            }
        });
        if (log.isTraceEnabled()) {
            StringBuffer stringBuffer = new StringBuffer();
            hashMap2.forEach((topicPartition, topicPartition2) -> {
                stringBuffer.append(String.format("\tinnerName:%s, outerName:%s%n", topicPartition, topicPartition2));
            });
            log.trace("TXN_OFFSET_COMMIT TopicPartition relations: \n{}", stringBuffer.toString());
        }
        txnOffsetCommitRequest.offsets().putAll(hashMap);
        this.groupCoordinator.handleTxnCommitOffsets(txnOffsetCommitRequest.consumerGroupId(), txnOffsetCommitRequest.producerId(), txnOffsetCommitRequest.producerEpoch(), convertTxnOffsets(txnOffsetCommitRequest.offsets())).whenComplete((map, th) -> {
            replaceTopicPartition(map, hashMap2);
            if (!nonExistingTopicErrors.isEmpty()) {
                map.putAll(nonExistingTopicErrors);
            }
            completableFuture.complete(new TxnOffsetCommitResponse(0, map));
        });
    }

    private Map<TopicPartition, OffsetAndMetadata> convertTxnOffsets(Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> map) {
        long milliseconds = SystemTime.SYSTEM.milliseconds();
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> entry : map.entrySet()) {
            TxnOffsetCommitRequest.CommittedOffset value = entry.getValue();
            hashMap.put(entry.getKey(), OffsetAndMetadata.apply(value.offset(), value.metadata() == null ? "" : value.metadata(), milliseconds, -1L));
        }
        return hashMap;
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleEndTxn(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        EndTxnRequest endTxnRequest = (EndTxnRequest) kafkaHeaderAndRequest.getRequest();
        this.transactionCoordinator.handleEndTransaction(endTxnRequest.transactionalId(), endTxnRequest.producerId(), endTxnRequest.producerEpoch(), endTxnRequest.command(), this, completableFuture);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleWriteTxnMarkers(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        WriteTxnMarkersRequest writeTxnMarkersRequest = (WriteTxnMarkersRequest) kafkaHeaderAndRequest.getRequest();
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry : writeTxnMarkersRequest.markers()) {
            Map map = (Map) hashMap.computeIfAbsent(Long.valueOf(txnMarkerEntry.producerId()), l -> {
                return new HashMap();
            });
            for (TopicPartition topicPartition : txnMarkerEntry.partitions()) {
                CompletableFuture completableFuture2 = new CompletableFuture();
                writeTxnMarker(topicPartition, txnMarkerEntry.transactionResult(), txnMarkerEntry.producerId(), txnMarkerEntry.producerEpoch()).whenComplete((l2, th) -> {
                    if (th != null) {
                        log.error("Failed to write txn marker for partition {}", topicPartition, th);
                        map.put(topicPartition, Errors.forException(th));
                    } else if (l2 == null) {
                        map.put(topicPartition, Errors.LEADER_NOT_AVAILABLE);
                    } else {
                        (TopicName.get(topicPartition.topic()).getLocalName().equals(Topic.GROUP_METADATA_TOPIC_NAME) ? this.groupCoordinator.scheduleHandleTxnCompletion(txnMarkerEntry.producerId(), Lists.newArrayList(new TopicPartition[]{topicPartition}).stream(), txnMarkerEntry.transactionResult()) : CompletableFuture.completedFuture(null)).whenComplete((r12, th) -> {
                            if (th != null) {
                                log.error("Failed to handle group end txn for partition {}", topicPartition, th);
                                map.put(topicPartition, Errors.forException(th));
                                completableFuture2.completeExceptionally(th);
                                return;
                            }
                            TopicName topicName = TopicName.get(KopTopic.toString(topicPartition));
                            long removeActivePidOffset = this.transactionCoordinator.removeActivePidOffset(topicName, txnMarkerEntry.producerId());
                            long lastStableOffset = this.transactionCoordinator.getLastStableOffset(topicName, l2.longValue());
                            if (txnMarkerEntry.transactionResult().equals(TransactionResult.ABORT)) {
                                this.transactionCoordinator.addAbortedIndex(AbortedIndexEntry.builder().version(writeTxnMarkersRequest.version()).pid(txnMarkerEntry.producerId()).firstOffset(removeActivePidOffset).lastOffset(l2.longValue()).lastStableOffset(lastStableOffset).build());
                            }
                            map.put(topicPartition, Errors.NONE);
                            completableFuture2.complete(null);
                        });
                    }
                });
                arrayList.add(completableFuture2);
            }
        }
        FutureUtil.waitForAll(arrayList).whenComplete((r7, th2) -> {
            if (th2 == null) {
                completableFuture.complete(new WriteTxnMarkersResponse((Map<Long, Map<TopicPartition, Errors>>) hashMap));
            } else {
                log.error("Write txn mark fail!", th2);
                completableFuture.complete(new WriteTxnMarkersResponse((Map<Long, Map<TopicPartition, Errors>>) hashMap));
            }
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleDeleteTopics(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof DeleteTopicsRequest);
        completableFuture.complete(new DeleteTopicsResponse(this.adminManager.deleteTopics(((DeleteTopicsRequest) kafkaHeaderAndRequest.getRequest()).topics())));
    }

    private CompletableFuture<Long> writeTxnMarker(TopicPartition topicPartition, TransactionResult transactionResult, long j, short s) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        this.topicManager.getTopic(TopicName.get(KopTopic.toString(topicPartition)).toString()).whenComplete((optional, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else if (!optional.isPresent()) {
                completableFuture.complete(null);
            } else {
                PersistentTopic persistentTopic = (PersistentTopic) optional.get();
                persistentTopic.publishMessage(generateTxnMarker(transactionResult, j, s), MessagePublishContext.get(completableFuture, persistentTopic, 1L, SystemTime.SYSTEM.milliseconds()));
            }
        });
        return completableFuture;
    }

    private ByteBuf generateTxnMarker(TransactionResult transactionResult, long j, short s) {
        MarkerType markerType;
        ControlRecordType controlRecordType;
        if (transactionResult.equals(TransactionResult.COMMIT)) {
            markerType = MarkerType.TXN_COMMIT;
            controlRecordType = ControlRecordType.COMMIT;
        } else {
            markerType = MarkerType.TXN_ABORT;
            controlRecordType = ControlRecordType.ABORT;
        }
        return Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, new MessageMetadata().setTxnidMostBits(j).setTxnidLeastBits(s).setMarkerType(markerType.getValue()).setPublishTime(SystemTime.SYSTEM.milliseconds()).setProducerName("").setSequenceId(0L), Unpooled.wrappedBuffer(MemoryRecords.withEndTransactionMarker(j, s, new EndTransactionMarker(controlRecordType, 0)).buffer()));
    }

    private SaslHandshakeResponse checkSaslMechanism(String str) {
        return getKafkaConfig().getSaslAllowedMechanisms().contains(str) ? new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()) : new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, new HashSet());
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.error("Caught error in handler, closing channel", th);
        channelHandlerContext.close();
    }

    private CompletableFuture<Optional<String>> getProtocolDataToAdvertise(InetSocketAddress inetSocketAddress, TopicName topicName) {
        CompletableFuture<Optional<String>> completableFuture = new CompletableFuture<>();
        if (inetSocketAddress == null) {
            log.error("[{}] failed get pulsar address, returned null.", topicName.toString());
            KafkaTopicManager.removeTopicManagerCache(topicName.toString());
            completableFuture.complete(Optional.empty());
            return completableFuture;
        }
        if (log.isDebugEnabled()) {
            log.debug("Found broker for topic {} puslarAddress: {}", topicName, inetSocketAddress);
        }
        CompletableFuture<Optional<String>> completableFuture2 = KafkaTopicManager.KOP_ADDRESS_CACHE.get(topicName.toString());
        if (completableFuture2 != null) {
            return completableFuture2;
        }
        if (StringUtil.isBlank(this.kafkaConfig.getKafkaListenerName())) {
            this.pulsarService.getPulsarResources().getDynamicConfigResources().getChildrenAsync("/loadbalance/brokers").whenComplete((list, th) -> {
                if (th != null) {
                    log.error("Error in getChildrenAsync(zk://loadbalance) for {}", inetSocketAddress, th);
                    completableFuture.complete(Optional.empty());
                    return;
                }
                String str = inetSocketAddress.getHostName() + ParameterizedMessage.ERROR_MSG_SEPARATOR + inetSocketAddress.getPort();
                ArrayList newArrayList = Lists.newArrayList();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    String str2 = (String) it.next();
                    if (str2.startsWith(inetSocketAddress.getHostName() + ParameterizedMessage.ERROR_MSG_SEPARATOR)) {
                        newArrayList.add(str2);
                    }
                }
                if (!newArrayList.isEmpty()) {
                    List list = (List) newArrayList.stream().map(str3 -> {
                        return this.localBrokerDataCache.get(String.format("%s/%s", "/loadbalance/brokers", str3));
                    }).collect(Collectors.toList());
                    FutureUtil.waitForAll(list).whenComplete((r13, th) -> {
                        if (th != null) {
                            log.error("Error in getDataAsync() for {}", inetSocketAddress, th);
                            completableFuture.complete(Optional.empty());
                            KafkaTopicManager.removeTopicManagerCache(topicName.toString());
                            return;
                        }
                        try {
                            Iterator it2 = list.iterator();
                            while (it2.hasNext()) {
                                ServiceLookupData serviceLookupData = (ServiceLookupData) ((Optional) ((CompletableFuture) it2.next()).get()).get();
                                if (log.isDebugEnabled()) {
                                    log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, pulsarUrlTls: {}, webUrl: {}, webUrlTls: {} kafka: {}", topicName, serviceLookupData.getPulsarServiceUrl(), serviceLookupData.getPulsarServiceUrlTls(), serviceLookupData.getWebServiceUrl(), serviceLookupData.getWebServiceUrlTls(), serviceLookupData.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME));
                                }
                                if (lookupDataContainsAddress(serviceLookupData, str)) {
                                    KafkaTopicManager.KOP_ADDRESS_CACHE.put(topicName.toString(), completableFuture);
                                    completableFuture.complete(serviceLookupData.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME));
                                    return;
                                }
                            }
                            log.error("Not able to search {} in all child of zk://loadbalance", inetSocketAddress);
                            completableFuture.complete(Optional.empty());
                        } catch (Exception e) {
                            log.error("Error in {} lookupFuture get: ", inetSocketAddress, e);
                            completableFuture.complete(Optional.empty());
                            KafkaTopicManager.removeTopicManagerCache(topicName.toString());
                        }
                    });
                } else {
                    log.error("No node for broker {} under zk://loadbalance", inetSocketAddress);
                    completableFuture.complete(Optional.empty());
                    KafkaTopicManager.removeTopicManagerCache(topicName.toString());
                }
            });
            return completableFuture;
        }
        String format = String.format("%s://%s:%s", SecurityProtocol.PLAINTEXT.name(), inetSocketAddress.getHostName(), Integer.valueOf(inetSocketAddress.getPort()));
        KafkaTopicManager.KOP_ADDRESS_CACHE.put(topicName.toString(), completableFuture);
        completableFuture.complete(Optional.ofNullable(format));
        if (log.isDebugEnabled()) {
            log.debug("{} get kafka Advertised Address through kafkaListenerName: {}", topicName, inetSocketAddress);
        }
        return completableFuture;
    }

    protected boolean isOffsetTopic(String str) {
        return str != null && str.contains(new StringBuilder().append(this.kafkaConfig.getKafkaMetadataTenant()).append("/").append(this.kafkaConfig.getKafkaMetadataNamespace()).append("/").append(Topic.GROUP_METADATA_TOPIC_NAME).toString());
    }

    protected boolean isTransactionTopic(String str) {
        return str != null && str.contains(new StringBuilder().append(this.kafkaConfig.getKafkaMetadataTenant()).append("/").append(this.kafkaConfig.getKafkaMetadataNamespace()).append("/").append(Topic.TRANSACTION_STATE_TOPIC_NAME).toString());
    }

    public CompletableFuture<MetadataResponse.PartitionMetadata> findBroker(TopicName topicName) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle Lookup for {}", this.ctx.channel(), topicName);
        }
        CompletableFuture<MetadataResponse.PartitionMetadata> completableFuture = new CompletableFuture<>();
        this.topicManager.getTopicBroker(topicName.toString()).thenCompose(inetSocketAddress -> {
            return getProtocolDataToAdvertise(inetSocketAddress, topicName);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (optional, th) -> {
            if (!optional.isPresent() || th != null) {
                log.error("Not get advertise data for Kafka topic:{}. throwable: [{}]", topicName, th.getMessage());
                KafkaTopicManager.removeTopicManagerCache(topicName.toString());
                completableFuture.complete(null);
                return;
            }
            String str = (String) optional.get();
            EndPoint sslEndPoint = this.tlsEnabled.booleanValue() ? EndPoint.getSslEndPoint(str) : EndPoint.getPlainTextEndPoint(str);
            Node newNode = newNode(sslEndPoint.getInetAddress());
            if (log.isDebugEnabled()) {
                log.debug("Found broker localListeners: {} for topicName: {}, localListeners: {}, found Listeners: {}", str, topicName, this.advertisedListeners, str);
            }
            if (!this.advertisedListeners.contains(sslEndPoint.getOriginalListener())) {
                KafkaTopicManager.removeTopicManagerCache(topicName.toString());
            }
            completableFuture.complete(newPartitionMetadata(topicName, newNode));
        });
        return completableFuture;
    }

    static Node newNode(InetSocketAddress inetSocketAddress) {
        if (log.isDebugEnabled()) {
            log.debug("Return Broker Node of {}. {}:{}", inetSocketAddress, inetSocketAddress.getHostString(), Integer.valueOf(inetSocketAddress.getPort()));
        }
        return new Node(Murmur3_32Hash.getInstance().makeHash((inetSocketAddress.getHostString() + inetSocketAddress.getPort()).getBytes(StandardCharsets.UTF_8)), inetSocketAddress.getHostString(), inetSocketAddress.getPort());
    }

    Node newSelfNode() {
        return newNode(this.advertisedEndPoint.getInetAddress());
    }

    static MetadataResponse.PartitionMetadata newPartitionMetadata(TopicName topicName, Node node) {
        int partitionIndex = topicName.getPartitionIndex();
        int i = partitionIndex == -1 ? 0 : partitionIndex;
        if (log.isDebugEnabled()) {
            log.debug("Return PartitionMetadata node: {}, topicName: {}", node, topicName);
        }
        return new MetadataResponse.PartitionMetadata(Errors.NONE, i, node, Lists.newArrayList(new Node[]{node}), Lists.newArrayList(new Node[]{node}), Collections.emptyList());
    }

    static MetadataResponse.PartitionMetadata newFailedPartitionMetadata(TopicName topicName) {
        int partitionIndex = topicName.getPartitionIndex();
        int i = partitionIndex == -1 ? 0 : partitionIndex;
        log.warn("Failed find Broker metadata, create PartitionMetadata with NOT_LEADER_FOR_PARTITION");
        return new MetadataResponse.PartitionMetadata(Errors.NOT_LEADER_FOR_PARTITION, i, Node.noNode(), Lists.newArrayList(new Node[]{Node.noNode()}), Lists.newArrayList(new Node[]{Node.noNode()}), Collections.emptyList());
    }

    static AbstractResponse failedResponse(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, Throwable th) {
        if (log.isDebugEnabled()) {
            log.debug("Request {} get failed response ", kafkaHeaderAndRequest.getHeader().apiKey(), th);
        }
        return kafkaHeaderAndRequest.getRequest().getErrorResponse(((Integer) CommonFields.THROTTLE_TIME_MS.defaultValue).intValue(), th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean lookupDataContainsAddress(ServiceLookupData serviceLookupData, String str) {
        return (serviceLookupData.getPulsarServiceUrl() != null && serviceLookupData.getPulsarServiceUrl().contains(str)) || (serviceLookupData.getPulsarServiceUrlTls() != null && serviceLookupData.getPulsarServiceUrlTls().contains(str)) || ((serviceLookupData.getWebServiceUrl() != null && serviceLookupData.getWebServiceUrl().contains(str)) || (serviceLookupData.getWebServiceUrlTls() != null && serviceLookupData.getWebServiceUrlTls().contains(str)));
    }

    private static MemoryRecords validateRecords(short s, TopicPartition topicPartition, MemoryRecords memoryRecords) {
        MemoryRecords readableRecords;
        if (s >= 3) {
            Iterator<MutableRecordBatch> it = memoryRecords.batches().iterator();
            if (!it.hasNext()) {
                throw new InvalidRecordException("Produce requests with version " + ((int) s) + " must have at least one record batch");
            }
            if (it.next().magic() != 2) {
                throw new InvalidRecordException("Produce requests with version " + ((int) s) + " are only allowed to contain record batches with magic version 2");
            }
            if (it.hasNext()) {
                throw new InvalidRecordException("Produce requests with version " + ((int) s) + " are only allowed to contain exactly one record batch");
            }
        }
        int i = 0;
        for (MutableRecordBatch mutableRecordBatch : memoryRecords.batches()) {
            if (mutableRecordBatch.magic() >= 2 && mutableRecordBatch.baseOffset() != 0) {
                throw new InvalidRecordException("The baseOffset of the record batch in the append to " + topicPartition + " should be 0, but it is " + mutableRecordBatch.baseOffset());
            }
            mutableRecordBatch.ensureValid();
            i += mutableRecordBatch.sizeInBytes();
        }
        if (i < 0) {
            throw new CorruptRecordException("Cannot append record batch with illegal length " + i + " to log for " + topicPartition + ". A possible cause is corrupted produce request.");
        }
        if (i == memoryRecords.sizeInBytes()) {
            readableRecords = memoryRecords;
        } else {
            ByteBuffer duplicate = memoryRecords.buffer().duplicate();
            duplicate.limit(i);
            readableRecords = MemoryRecords.readableRecords(duplicate);
        }
        return readableRecords;
    }

    private void updateProducerStats(TopicPartition topicPartition, int i, int i2) {
        this.requestStats.getStatsLogger().scopeLabel("topic", topicPartition.topic()).scopeLabel(KopServerStats.PARTITION_SCOPE, String.valueOf(topicPartition.partition())).getCounter(KopServerStats.BYTES_IN).add(i2);
        this.requestStats.getStatsLogger().scopeLabel("topic", topicPartition.topic()).scopeLabel(KopServerStats.PARTITION_SCOPE, String.valueOf(topicPartition.partition())).getCounter(KopServerStats.MESSAGE_IN).add(i);
        RequestStats.BATCH_COUNT_PER_MEMORY_RECORDS_INSTANCE.set(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public CompletableFuture<Boolean> authorize(AclOperation aclOperation, Resource resource) {
        CompletableFuture<Boolean> canLookupAsync;
        if (this.authorizer == null) {
            return CompletableFuture.completedFuture(true);
        }
        if (this.authenticator.session() == null) {
            return CompletableFuture.completedFuture(false);
        }
        Session session = this.authenticator.session();
        switch (aclOperation) {
            case READ:
                canLookupAsync = this.authorizer.canConsumeAsync(session.getPrincipal(), resource);
                break;
            case IDEMPOTENT_WRITE:
            case WRITE:
                canLookupAsync = this.authorizer.canProduceAsync(session.getPrincipal(), resource);
                break;
            case DESCRIBE:
                canLookupAsync = this.authorizer.canLookupAsync(session.getPrincipal(), resource);
                break;
            case CREATE:
            case DELETE:
            case CLUSTER_ACTION:
            case DESCRIBE_CONFIGS:
            case ALTER_CONFIGS:
            case ALTER:
            case UNKNOWN:
            case ALL:
            case ANY:
            default:
                return FutureUtil.failedFuture(new IllegalStateException("AclOperation [" + aclOperation.name() + "] is not supported."));
        }
        return canLookupAsync;
    }

    public PulsarService getPulsarService() {
        return this.pulsarService;
    }

    public KafkaTopicManager getTopicManager() {
        return this.topicManager;
    }

    public GroupCoordinator getGroupCoordinator() {
        return this.groupCoordinator;
    }

    public TransactionCoordinator getTransactionCoordinator() {
        return this.transactionCoordinator;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public PulsarAdmin getAdmin() {
        return this.admin;
    }

    public SaslAuthenticator getAuthenticator() {
        return this.authenticator;
    }

    public Authorizer getAuthorizer() {
        return this.authorizer;
    }

    public AdminManager getAdminManager() {
        return this.adminManager;
    }

    public MetadataCache<LocalBrokerData> getLocalBrokerDataCache() {
        return this.localBrokerDataCache;
    }

    public Boolean getTlsEnabled() {
        return this.tlsEnabled;
    }

    public EndPoint getAdvertisedEndPoint() {
        return this.advertisedEndPoint;
    }

    public String getAdvertisedListeners() {
        return this.advertisedListeners;
    }

    public int getDefaultNumPartitions() {
        return this.defaultNumPartitions;
    }

    public int getMaxReadEntriesNum() {
        return this.maxReadEntriesNum;
    }

    public int getFailedAuthenticationDelayMs() {
        return this.failedAuthenticationDelayMs;
    }

    public String getOffsetsTopicName() {
        return this.offsetsTopicName;
    }

    public String getTxnTopicName() {
        return this.txnTopicName;
    }

    public Set<String> getAllowedNamespaces() {
        return this.allowedNamespaces;
    }

    public ConcurrentHashMap<String, String> getCurrentConnectedGroup() {
        return this.currentConnectedGroup;
    }

    public String getGroupIdStoredPath() {
        return this.groupIdStoredPath;
    }

    public Set<String> getGroupIds() {
        return this.groupIds;
    }

    public Map<TopicPartition, PendingTopicFutures> getPendingTopicFuturesMap() {
        return this.pendingTopicFuturesMap;
    }

    public DelayedOperationPurgatory<DelayedOperation> getProducePurgatory() {
        return this.producePurgatory;
    }

    public DelayedOperationPurgatory<DelayedOperation> getFetchPurgatory() {
        return this.fetchPurgatory;
    }

    public long getMaxPendingBytes() {
        return this.maxPendingBytes;
    }

    public long getResumeThresholdPendingBytes() {
        return this.resumeThresholdPendingBytes;
    }

    public AtomicLong getPendingBytes() {
        return this.pendingBytes;
    }

    public boolean isAutoReadDisabledPublishBufferLimiting() {
        return this.autoReadDisabledPublishBufferLimiting;
    }

    public EntryFormatter getEntryFormatter() {
        return this.entryFormatter;
    }
}
