package io.streamnative.pulsar.handlers.kop;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.channel.ChannelHandlerContext;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.naming.AuthenticationException;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.Utils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.class */
public class KafkaRequestHandler extends KafkaCommandDecoder {
    private static final Logger log = LoggerFactory.getLogger(KafkaRequestHandler.class);
    public static final long DEFAULT_TIMESTAMP = 0;
    private final PulsarService pulsarService;
    private final KafkaServiceConfiguration kafkaConfig;
    private final KafkaTopicManager topicManager;
    private final GroupCoordinator groupCoordinator;
    private final String clusterName;
    private final ScheduledExecutorService executor;
    private final PulsarAdmin admin;
    private final SaslAuthenticator authenticator;
    private final AdminManager adminManager;
    private final Boolean tlsEnabled;
    private final EndPoint advertisedEndPoint;
    private final String advertisedListeners;
    private final int defaultNumPartitions;
    public final int maxReadEntriesNum;
    private final EntryFormatter entryFormatter;
    private final Map<TopicPartition, PendingProduceQueue> pendingProduceQueueMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.streamnative.pulsar.handlers.kop.KafkaRequestHandler$2, reason: invalid class name */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaRequestHandler$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$ApiKeys = new int[ApiKeys.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FETCH.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.LIST_OFFSETS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public KafkaRequestHandler(PulsarService pulsarService, KafkaServiceConfiguration kafkaServiceConfiguration, GroupCoordinator groupCoordinator, Boolean bool, EndPoint endPoint) throws Exception {
        this.pulsarService = pulsarService;
        this.kafkaConfig = kafkaServiceConfiguration;
        this.groupCoordinator = groupCoordinator;
        this.clusterName = kafkaServiceConfiguration.getClusterName();
        this.executor = pulsarService.getExecutor();
        this.admin = pulsarService.getAdminClient();
        this.authenticator = pulsarService.getBrokerService().isAuthenticationEnabled() && !kafkaServiceConfiguration.getSaslAllowedMechanisms().isEmpty() ? new SaslAuthenticator(pulsarService, kafkaServiceConfiguration.getSaslAllowedMechanisms()) : null;
        this.adminManager = new AdminManager(this.admin);
        this.tlsEnabled = bool;
        this.advertisedEndPoint = endPoint;
        this.advertisedListeners = kafkaServiceConfiguration.getKafkaAdvertisedListeners();
        this.topicManager = new KafkaTopicManager(this);
        this.defaultNumPartitions = kafkaServiceConfiguration.getDefaultNumPartitions();
        this.maxReadEntriesNum = kafkaServiceConfiguration.getMaxReadEntriesNum();
        this.entryFormatter = EntryFormatterFactory.create(kafkaServiceConfiguration.getEntryFormat());
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        getTopicManager().updateCtx();
        if (this.authenticator != null) {
            this.authenticator.reset();
        }
        log.info("channel active: {}", channelHandlerContext.channel());
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        log.info("channel inactive {}", channelHandlerContext.channel());
        close();
        this.isActive.set(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    public void close() {
        if (this.isActive.getAndSet(false)) {
            log.info("close channel {}", this.ctx.channel());
            writeAndFlushWhenInactiveChannel(this.ctx.channel());
            this.ctx.close();
            this.topicManager.close();
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected boolean hasAuthenticated(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest) {
        return this.authenticator == null || this.authenticator.complete();
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void authenticate(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) throws AuthenticationException {
        if (this.authenticator != null) {
            this.authenticator.authenticate(kafkaHeaderAndRequest.getHeader(), kafkaHeaderAndRequest.getRequest(), completableFuture);
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleApiVersionsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        if (ApiKeys.API_VERSIONS.isVersionSupported(kafkaHeaderAndRequest.getHeader().apiVersion())) {
            completableFuture.complete(overloadDefaultApiVersionsResponse(false));
        } else {
            completableFuture.complete(overloadDefaultApiVersionsResponse(true));
        }
    }

    protected ApiVersionsResponse overloadDefaultApiVersionsResponse(boolean z) {
        ArrayList arrayList = new ArrayList();
        if (z) {
            return new ApiVersionsResponse(0, Errors.UNSUPPORTED_VERSION, arrayList);
        }
        for (ApiKeys apiKeys : ApiKeys.values()) {
            if (apiKeys.minRequiredInterBrokerMagic <= 2) {
                switch (AnonymousClass2.$SwitchMap$org$apache$kafka$common$protocol$ApiKeys[apiKeys.ordinal()]) {
                    case 1:
                        arrayList.add(new ApiVersionsResponse.ApiVersion((short) 1, (short) 4, apiKeys.latestVersion()));
                        break;
                    case 2:
                        arrayList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0, apiKeys.latestVersion()));
                        break;
                    default:
                        arrayList.add(new ApiVersionsResponse.ApiVersion(apiKeys));
                        break;
                }
            }
        }
        return new ApiVersionsResponse(0, Errors.NONE, arrayList);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleError(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        String format = String.format("Kafka API (%s) Not supported by kop server.", kafkaHeaderAndRequest.getHeader().apiKey());
        log.error(format);
        completableFuture.complete(kafkaHeaderAndRequest.getRequest().getErrorResponse(new UnsupportedOperationException(format)));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleInactive(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        AbstractResponse errorResponse = kafkaHeaderAndRequest.getRequest().getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));
        log.error("Kafka API {} is send to a closing channel", kafkaHeaderAndRequest.getHeader().apiKey());
        completableFuture.complete(errorResponse);
    }

    private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String str) {
        return this.admin.topics().getPartitionedTopicMetadataAsync(str);
    }

    private void getAllTopicsAsync(CompletableFuture<Map<String, List<TopicName>>> completableFuture) {
        String fullName = new KopTopic(String.join("/", this.kafkaConfig.getKafkaMetadataTenant(), this.kafkaConfig.getKafkaMetadataNamespace(), "__consumer_offsets")).getFullName();
        CompletableFuture thenApply = this.admin.tenants().getTenantsAsync().thenApply(list -> {
            if (list.isEmpty()) {
                completableFuture.complete(Maps.newHashMap());
                return null;
            }
            ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
            AtomicInteger atomicInteger = new AtomicInteger(list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                this.admin.namespaces().getNamespacesAsync((String) it.next()).thenApply(list -> {
                    if (list.isEmpty() && atomicInteger.decrementAndGet() == 0) {
                        completableFuture.complete(newConcurrentMap);
                        return null;
                    }
                    AtomicInteger atomicInteger2 = new AtomicInteger(list.size());
                    Iterator it2 = list.iterator();
                    while (it2.hasNext()) {
                        this.pulsarService.getNamespaceService().getListOfPersistentTopics(NamespaceName.get((String) it2.next())).thenApply(list -> {
                            Iterator it3 = list.iterator();
                            while (it3.hasNext()) {
                                TopicName topicName = TopicName.get((String) it3.next());
                                String partitionedTopicName = topicName.getPartitionedTopicName();
                                if (!partitionedTopicName.equals(fullName)) {
                                    ((List) newConcurrentMap.computeIfAbsent(KopTopic.removeDefaultNamespacePrefix(partitionedTopicName), str -> {
                                        return Collections.synchronizedList(new ArrayList());
                                    })).add(topicName);
                                }
                            }
                            if (atomicInteger2.decrementAndGet() != 0 || atomicInteger.decrementAndGet() != 0) {
                                return null;
                            }
                            completableFuture.complete(newConcurrentMap);
                            return null;
                        });
                    }
                    return null;
                });
            }
            return null;
        });
        completableFuture.getClass();
        thenApply.exceptionally(completableFuture::completeExceptionally);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleTopicMetadataRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof MetadataRequest);
        MetadataRequest request = kafkaHeaderAndRequest.getRequest();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Request {}: for topic {} ", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), request.topics()});
        }
        List synchronizedList = Collections.synchronizedList(Lists.newArrayList());
        List synchronizedList2 = Collections.synchronizedList(Lists.newArrayList());
        List list = request.topics();
        CompletableFuture<Map<String, List<TopicName>>> completableFuture2 = new CompletableFuture<>();
        if (list == null || list.isEmpty()) {
            KafkaTopicManager kafkaTopicManager = this.topicManager;
            KafkaTopicManager.clearTopicManagerCache();
            getAllTopicsAsync(completableFuture2);
        } else {
            HashMap newHashMap = Maps.newHashMap();
            List list2 = request.topics();
            int size = list2.size();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            list2.stream().forEach(str -> {
                KopTopic kopTopic = new KopTopic(str);
                getPartitionedTopicMetadataAsync(kopTopic.getFullName()).whenComplete((partitionedTopicMetadata, th) -> {
                    if (th != null) {
                        synchronizedList.add(new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, str, false, Collections.emptyList()));
                        log.warn("[{}] Request {}: Failed to get partitioned pulsar topic {} metadata: {}", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), kopTopic.getFullName(), th.getMessage()});
                    } else if (partitionedTopicMetadata.partitions > 0) {
                        if (log.isDebugEnabled()) {
                            log.debug("Topic {} has {} partitions", str, Integer.valueOf(partitionedTopicMetadata.partitions));
                        }
                        newHashMap.put(str, (List) IntStream.range(0, partitionedTopicMetadata.partitions).mapToObj(i -> {
                            return TopicName.get(kopTopic.getPartitionName(i));
                        }).collect(Collectors.toList()));
                    } else if (this.kafkaConfig.isAllowAutoTopicCreation() && request.allowAutoTopicCreation() && this.kafkaConfig.isDefaultTopicTypePartitioned()) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Request {}: Topic {} has single partition, auto create partitioned topic", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str});
                        }
                        this.admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), this.defaultNumPartitions);
                        newHashMap.put(str, (List) IntStream.range(0, this.defaultNumPartitions).mapToObj(i2 -> {
                            return TopicName.get(kopTopic.getPartitionName(i2));
                        }).collect(Collectors.toList()));
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Request {}: Topic {} is non-partitioned, auto create non-partitioned topic", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str});
                        }
                        this.admin.topics().createNonPartitionedTopicAsync(kopTopic.getFullName());
                        newHashMap.put(str, Arrays.asList(TopicName.get(kopTopic.getFullName())));
                    }
                    if (atomicInteger.incrementAndGet() == size) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Request {}: Completed get {} topic's partitions", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), Integer.valueOf(size)});
                        }
                        completableFuture2.complete(newHashMap);
                    }
                });
            });
        }
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        int id = newSelfNode().id();
        completableFuture2.whenComplete((map, th) -> {
            if (th != null) {
                log.warn("[{}] Request {}: Exception fetching metadata, will return null Response", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), th});
                synchronizedList2.add(newSelfNode());
                completableFuture.complete(new MetadataResponse(synchronizedList2, this.clusterName, id, Collections.emptyList()));
            } else {
                int size2 = map.size();
                if (size2 != 0) {
                    map.forEach((str2, list3) -> {
                        int size3 = list3.size();
                        AtomicInteger atomicInteger3 = new AtomicInteger(0);
                        List synchronizedList3 = Collections.synchronizedList(Lists.newArrayListWithExpectedSize(size3));
                        list3.forEach(topicName -> {
                            findBroker(topicName).whenComplete((partitionMetadata, th) -> {
                                if (th != null || partitionMetadata == null) {
                                    log.warn("[{}] Request {}: Exception while find Broker metadata", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), th});
                                    synchronizedList3.add(newFailedPartitionMetadata(topicName));
                                } else {
                                    Node leader = partitionMetadata.leader();
                                    synchronized (synchronizedList2) {
                                        if (!synchronizedList2.stream().anyMatch(node -> {
                                            return node.equals(leader);
                                        })) {
                                            synchronizedList2.add(leader);
                                        }
                                    }
                                    synchronizedList3.add(partitionMetadata);
                                }
                                int incrementAndGet = atomicInteger3.incrementAndGet();
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] Request {}: FindBroker for topic {}, partitions found/all: {}/{}.", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str2, Integer.valueOf(incrementAndGet), Integer.valueOf(size3)});
                                }
                                if (incrementAndGet == size3) {
                                    synchronizedList.add(new MetadataResponse.TopicMetadata(Errors.NONE, str2, false, synchronizedList3));
                                    int incrementAndGet2 = atomicInteger2.incrementAndGet();
                                    if (log.isDebugEnabled()) {
                                        log.debug("[{}] Request {}: Completed findBroker for topic {}, partitions found/all: {}/{}. \n dump All Metadata:", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), str2, Integer.valueOf(incrementAndGet2), Integer.valueOf(size2)});
                                        synchronizedList.stream().forEach(topicMetadata -> {
                                            log.debug("TopicMetadata response: {}", topicMetadata.toString());
                                        });
                                    }
                                    if (incrementAndGet2 == size2) {
                                        completableFuture.complete(new MetadataResponse(synchronizedList2, this.clusterName, id, synchronizedList));
                                    }
                                }
                            });
                        });
                    });
                } else {
                    synchronizedList2.add(newSelfNode());
                    completableFuture.complete(new MetadataResponse(synchronizedList2, this.clusterName, id, synchronizedList));
                }
            }
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleProduceRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof ProduceRequest);
        ProduceRequest request = kafkaHeaderAndRequest.getRequest();
        if (request.transactionalId() != null) {
            log.warn("[{}] Transactions not supported", this.ctx.channel());
            completableFuture.complete(failedResponse(kafkaHeaderAndRequest, new UnsupportedOperationException("No transaction support")));
            return;
        }
        HashMap hashMap = new HashMap();
        int size = request.partitionRecordsOrFail().size();
        long readableBytes = kafkaHeaderAndRequest.getBuffer().readableBytes();
        this.topicManager.getInternalServerCnx().increasePublishBuffer(readableBytes);
        for (Map.Entry entry : request.partitionRecordsOrFail().entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            CompletableFuture completableFuture2 = new CompletableFuture();
            hashMap.put(topicPartition, completableFuture2);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} ", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Integer.valueOf(size)});
            }
            PendingProduce pendingProduce = new PendingProduce(completableFuture2, this.topicManager, KopTopic.toString(topicPartition), this.entryFormatter, (MemoryRecords) entry.getValue(), this.executor);
            PendingProduceQueue computeIfAbsent = this.pendingProduceQueueMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                return new PendingProduceQueue();
            });
            computeIfAbsent.add(pendingProduce);
            computeIfAbsent.getClass();
            pendingProduce.whenComplete(computeIfAbsent::sendCompletedProduces);
        }
        CompletableFuture.allOf((CompletableFuture[]) hashMap.values().toArray(new CompletableFuture[size])).whenComplete((r11, th) -> {
            this.topicManager.getInternalServerCnx().decreasePublishBuffer(readableBytes);
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            for (Map.Entry entry2 : hashMap.entrySet()) {
                concurrentHashMap.put(entry2.getKey(), ((CompletableFuture) entry2.getValue()).join());
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Request {}: Complete handle produce.", this.ctx.channel(), kafkaHeaderAndRequest.toString());
            }
            completableFuture.complete(new ProduceResponse(concurrentHashMap));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleFindCoordinatorRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof FindCoordinatorRequest);
        FindCoordinatorRequest request = kafkaHeaderAndRequest.getRequest();
        if (request.coordinatorType() != FindCoordinatorRequest.CoordinatorType.GROUP) {
            throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type " + request.coordinatorType());
        }
        int partitionFor = this.groupCoordinator.partitionFor(request.coordinatorKey());
        findBroker(TopicName.get(this.groupCoordinator.getTopicPartitionName(partitionFor))).whenComplete((partitionMetadata, th) -> {
            if (th != null || partitionMetadata == null) {
                log.error("[{}] Request {}: Error while find coordinator, .", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), th});
                completableFuture.complete(new FindCoordinatorResponse(Errors.LEADER_NOT_AVAILABLE, Node.noNode()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found node {} as coordinator for key {} partition {}.", new Object[]{this.ctx.channel(), partitionMetadata.leader(), request.coordinatorKey(), Integer.valueOf(partitionFor)});
                }
                completableFuture.complete(new FindCoordinatorResponse(Errors.NONE, partitionMetadata.leader()));
            }
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleOffsetFetchRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof OffsetFetchRequest);
        OffsetFetchRequest request = kafkaHeaderAndRequest.getRequest();
        Preconditions.checkState(this.groupCoordinator != null, "Group Coordinator not started");
        KeyValue<Errors, Map<TopicPartition, OffsetFetchResponse.PartitionData>> handleFetchOffsets = this.groupCoordinator.handleFetchOffsets(request.groupId(), Optional.ofNullable(request.partitions()));
        completableFuture.complete(new OffsetFetchResponse((Errors) handleFetchOffsets.getKey(), (Map) handleFetchOffsets.getValue()));
    }

    private CompletableFuture<ListOffsetResponse.PartitionData> fetchOffsetForTimestamp(CompletableFuture<PersistentTopic> completableFuture, Long l, boolean z) {
        CompletableFuture<ListOffsetResponse.PartitionData> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenComplete((persistentTopic, th) -> {
            if (th != null || persistentTopic == null) {
                Logger logger = log;
                Object[] objArr = new Object[3];
                objArr[0] = persistentTopic == null ? "null" : persistentTopic.getName();
                objArr[1] = l;
                objArr[2] = th;
                logger.error("Failed while get persistentTopic topic: {} ts: {}. ", objArr);
                KafkaTopicManager kafkaTopicManager = this.topicManager;
                KafkaTopicManager.removeTopicManagerCache(persistentTopic.getName());
                completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.LEADER_NOT_AVAILABLE, -1L, -1L));
                return;
            }
            final ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
            if (l.longValue() == -1) {
                PositionImpl lastConfirmedEntry = managedLedger.getLastConfirmedEntry();
                if (log.isDebugEnabled()) {
                    log.debug("Get latest position for topic {} time {}. result: {}", new Object[]{persistentTopic.getName(), l, lastConfirmedEntry});
                }
                long entryId = lastConfirmedEntry.getEntryId();
                if (z) {
                    completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.NONE, Collections.singletonList(Long.valueOf(MessageIdUtils.getOffset(lastConfirmedEntry.getLedgerId(), entryId == -1 ? 0L : entryId)))));
                    return;
                } else {
                    completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.NONE, 0L, MessageIdUtils.getOffset(lastConfirmedEntry.getLedgerId(), entryId == -1 ? 0L : entryId)));
                    return;
                }
            }
            if (l.longValue() != -2) {
                new OffsetFinder(managedLedger).findMessages(l.longValue(), new AsyncCallbacks.FindEntryCallback() { // from class: io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.1
                    public void findEntryComplete(Position position, Object obj) {
                        PositionImpl positionImpl;
                        if (position == null) {
                            positionImpl = OffsetFinder.getFirstValidPosition(managedLedger);
                            if (positionImpl == null) {
                                KafkaRequestHandler.log.warn("Unable to find position for topic {} time {}. get NULL position", persistentTopic.getName(), l);
                                if (z) {
                                    completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, Collections.emptyList()));
                                    return;
                                } else {
                                    completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, -1L, -1L));
                                    return;
                                }
                            }
                        } else {
                            positionImpl = (PositionImpl) position;
                        }
                        if (KafkaRequestHandler.log.isDebugEnabled()) {
                            KafkaRequestHandler.log.debug("Find position for topic {} time {}. position: {}", new Object[]{persistentTopic.getName(), l, positionImpl});
                        }
                        if (z) {
                            completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.NONE, Collections.singletonList(Long.valueOf(MessageIdUtils.getOffset(positionImpl.getLedgerId(), positionImpl.getEntryId())))));
                        } else {
                            completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.NONE, 0L, MessageIdUtils.getOffset(positionImpl.getLedgerId(), positionImpl.getEntryId())));
                        }
                    }

                    public void findEntryFailed(ManagedLedgerException managedLedgerException, Optional<Position> optional, Object obj) {
                        KafkaRequestHandler.log.warn("Unable to find position for topic {} time {}. Exception:", new Object[]{persistentTopic.getName(), l, managedLedgerException});
                        if (z) {
                            completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, Collections.emptyList()));
                        } else {
                            completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, -1L, -1L));
                        }
                    }
                });
                return;
            }
            PositionImpl firstValidPosition = OffsetFinder.getFirstValidPosition(managedLedger);
            if (log.isDebugEnabled()) {
                log.debug("Get earliest position for topic {} time {}. result: {}", new Object[]{persistentTopic.getName(), l, firstValidPosition});
            }
            if (z) {
                completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.NONE, Collections.singletonList(Long.valueOf(MessageIdUtils.getOffset(firstValidPosition.getLedgerId(), firstValidPosition.getEntryId())))));
            } else {
                completableFuture2.complete(new ListOffsetResponse.PartitionData(Errors.NONE, 0L, MessageIdUtils.getOffset(firstValidPosition.getLedgerId(), firstValidPosition.getEntryId())));
            }
        });
        return completableFuture2;
    }

    private void handleListOffsetRequestV1AndAbove(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        ListOffsetRequest request = kafkaHeaderAndRequest.getRequest();
        HashMap newHashMap = Maps.newHashMap();
        request.partitionTimestamps().entrySet().stream().forEach(entry -> {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            newHashMap.put(topicPartition, fetchOffsetForTimestamp(this.topicManager.getTopic(KopTopic.toString(topicPartition)), (Long) entry.getValue(), false));
        });
        CompletableFuture.allOf((CompletableFuture[]) newHashMap.values().stream().toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r7, th) -> {
            completableFuture.complete(new ListOffsetResponse(CoreUtils.mapValue(newHashMap, completableFuture2 -> {
                return (ListOffsetResponse.PartitionData) completableFuture2.join();
            })));
        });
    }

    private void handleListOffsetRequestV0(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        ListOffsetRequest request = kafkaHeaderAndRequest.getRequest();
        HashMap newHashMap = Maps.newHashMap();
        if (log.isDebugEnabled()) {
            log.debug("received a v0 listOffset: {}", request.toString(true));
        }
        request.offsetData().entrySet().stream().forEach(entry -> {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            String kopTopic = KopTopic.toString(topicPartition);
            Long valueOf = Long.valueOf(((ListOffsetRequest.PartitionData) entry.getValue()).timestamp);
            if (((ListOffsetRequest.PartitionData) entry.getValue()).maxNumOffsets > 1) {
                log.warn("request is asking for multiples offsets for {}, not supported for now", kopTopic);
                new CompletableFuture().complete(new ListOffsetResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, Collections.singletonList(-1L)));
            }
            newHashMap.put(topicPartition, fetchOffsetForTimestamp(this.topicManager.getTopic(kopTopic), valueOf, true));
        });
        CompletableFuture.allOf((CompletableFuture[]) newHashMap.values().stream().toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r7, th) -> {
            completableFuture.complete(new ListOffsetResponse(CoreUtils.mapValue(newHashMap, completableFuture2 -> {
                return (ListOffsetResponse.PartitionData) completableFuture2.join();
            })));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleListOffsetRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof ListOffsetRequest);
        if (kafkaHeaderAndRequest.getHeader().apiVersion() == 0) {
            handleListOffsetRequestV0(kafkaHeaderAndRequest, completableFuture);
        } else {
            handleListOffsetRequestV1AndAbove(kafkaHeaderAndRequest, completableFuture);
        }
    }

    private Map<TopicPartition, Errors> nonExistingTopicErrors(OffsetCommitRequest offsetCommitRequest) {
        return Maps.newHashMap();
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleOffsetCommitRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof OffsetCommitRequest);
        Preconditions.checkState(this.groupCoordinator != null, "Group Coordinator not started");
        OffsetCommitRequest offsetCommitRequest = (OffsetCommitRequest) kafkaHeaderAndRequest.getRequest();
        Map<TopicPartition, Errors> nonExistingTopicErrors = nonExistingTopicErrors(offsetCommitRequest);
        this.groupCoordinator.handleCommitOffsets(offsetCommitRequest.groupId(), offsetCommitRequest.memberId(), offsetCommitRequest.generationId(), CoreUtils.mapValue((Map) offsetCommitRequest.offsetData().entrySet().stream().filter(entry -> {
            return !nonExistingTopicErrors.containsKey(entry.getKey());
        }).collect(Collectors.toMap(entry2 -> {
            return (TopicPartition) entry2.getKey();
        }, entry3 -> {
            return (OffsetCommitRequest.PartitionData) entry3.getValue();
        })), partitionData -> {
            return OffsetAndMetadata.apply(partitionData.offset, partitionData.metadata, partitionData.timestamp);
        })).thenAccept(map -> {
            if (nonExistingTopicErrors != null) {
                map.putAll(nonExistingTopicErrors);
            }
            completableFuture.complete(new OffsetCommitResponse(map));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleFetchRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof FetchRequest);
        FetchRequest request = kafkaHeaderAndRequest.getRequest();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ", new Object[]{this.ctx.channel(), kafkaHeaderAndRequest.getHeader(), Integer.valueOf(request.fetchData().size())});
            request.fetchData().forEach((topicPartition, partitionData) -> {
                log.debug("  Fetch request topic:{} data:{}.", topicPartition, partitionData.toString());
            });
        }
        MessageFetchContext.get(this).handleFetch(completableFuture, kafkaHeaderAndRequest);
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleJoinGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof JoinGroupRequest);
        Preconditions.checkState(this.groupCoordinator != null, "Group Coordinator not started");
        JoinGroupRequest request = kafkaHeaderAndRequest.getRequest();
        HashMap hashMap = new HashMap();
        request.groupProtocols().stream().forEach(protocolMetadata -> {
        });
        this.groupCoordinator.handleJoinGroup(request.groupId(), request.memberId(), kafkaHeaderAndRequest.getHeader().clientId(), kafkaHeaderAndRequest.getClientHost(), request.rebalanceTimeout(), request.sessionTimeout(), request.protocolType(), hashMap).thenAccept(joinGroupResult -> {
            HashMap hashMap2 = new HashMap();
            joinGroupResult.getMembers().forEach((str, bArr) -> {
            });
            JoinGroupResponse joinGroupResponse = new JoinGroupResponse(joinGroupResult.getError(), joinGroupResult.getGenerationId(), joinGroupResult.getSubProtocol(), joinGroupResult.getMemberId(), joinGroupResult.getLeaderId(), hashMap2);
            if (log.isTraceEnabled()) {
                log.trace("Sending join group response {} for correlation id {} to client {}.", new Object[]{joinGroupResponse, Integer.valueOf(kafkaHeaderAndRequest.getHeader().correlationId()), kafkaHeaderAndRequest.getHeader().clientId()});
            }
            completableFuture.complete(joinGroupResponse);
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleSyncGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof SyncGroupRequest);
        SyncGroupRequest request = kafkaHeaderAndRequest.getRequest();
        this.groupCoordinator.handleSyncGroup(request.groupId(), request.generationId(), request.memberId(), CoreUtils.mapValue(request.groupAssignment(), Utils::toArray)).thenAccept(keyValue -> {
            completableFuture.complete(new SyncGroupResponse((Errors) keyValue.getKey(), ByteBuffer.wrap((byte[]) keyValue.getValue())));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleHeartbeatRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof HeartbeatRequest);
        HeartbeatRequest request = kafkaHeaderAndRequest.getRequest();
        this.groupCoordinator.handleHeartbeat(request.groupId(), request.memberId(), request.groupGenerationId()).thenAccept(errors -> {
            HeartbeatResponse heartbeatResponse = new HeartbeatResponse(errors);
            if (log.isTraceEnabled()) {
                log.trace("Sending heartbeat response {} for correlation id {} to client {}.", new Object[]{heartbeatResponse, Integer.valueOf(kafkaHeaderAndRequest.getHeader().correlationId()), kafkaHeaderAndRequest.getHeader().clientId()});
            }
            completableFuture.complete(heartbeatResponse);
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleLeaveGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof LeaveGroupRequest);
        LeaveGroupRequest request = kafkaHeaderAndRequest.getRequest();
        this.groupCoordinator.handleLeaveGroup(request.groupId(), request.memberId()).thenAccept(errors -> {
            completableFuture.complete(new LeaveGroupResponse(errors));
        });
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleDescribeGroupRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof DescribeGroupsRequest);
        completableFuture.complete(new DescribeGroupsResponse((Map) kafkaHeaderAndRequest.getRequest().groupIds().stream().map(str -> {
            KeyValue<Errors, GroupMetadata.GroupSummary> handleDescribeGroup = this.groupCoordinator.handleDescribeGroup(str);
            GroupMetadata.GroupSummary groupSummary = (GroupMetadata.GroupSummary) handleDescribeGroup.getValue();
            return new KeyValue(str, new DescribeGroupsResponse.GroupMetadata((Errors) handleDescribeGroup.getKey(), groupSummary.state(), groupSummary.protocolType(), groupSummary.protocol(), (List) groupSummary.members().stream().map(memberSummary -> {
                return new DescribeGroupsResponse.GroupMember(memberSummary.memberId(), memberSummary.clientId(), memberSummary.clientHost(), ByteBuffer.wrap(memberSummary.metadata()), ByteBuffer.wrap(memberSummary.assignment()));
            }).collect(Collectors.toList())));
        }).collect(Collectors.toMap(keyValue -> {
            return (String) keyValue.getKey();
        }, keyValue2 -> {
            return (DescribeGroupsResponse.GroupMetadata) keyValue2.getValue();
        }))));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleListGroupsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof ListGroupsRequest);
        KeyValue<Errors, List<GroupMetadata.GroupOverview>> handleListGroups = this.groupCoordinator.handleListGroups();
        completableFuture.complete(new ListGroupsResponse((Errors) handleListGroups.getKey(), (List) ((List) handleListGroups.getValue()).stream().map(groupOverview -> {
            return new ListGroupsResponse.Group(groupOverview.groupId(), groupOverview.protocolType());
        }).collect(Collectors.toList())));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleDeleteGroupsRequest(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof DeleteGroupsRequest);
        completableFuture.complete(new DeleteGroupsResponse(this.groupCoordinator.handleDeleteGroups(kafkaHeaderAndRequest.getRequest().groups())));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleSaslAuthenticate(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        completableFuture.complete(new SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE, "SaslAuthenticate request received after successful authentication"));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleSaslHandshake(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        completableFuture.complete(new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet()));
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleCreateTopics(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof CreateTopicsRequest);
        CreateTopicsRequest request = kafkaHeaderAndRequest.getRequest();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Set duplicateTopics = request.duplicateTopics();
        request.topics().forEach((str, topicDetails) -> {
            if (!duplicateTopics.contains(str)) {
                hashMap2.put(str, topicDetails);
            } else {
                hashMap.put(str, new ApiError(Errors.INVALID_REQUEST, "Create topics request from client `" + kafkaHeaderAndRequest.getHeader().clientId() + "` contains multiple entries for the following topics: " + duplicateTopics));
            }
        });
        if (hashMap2.isEmpty()) {
            completableFuture.complete(new CreateTopicsResponse(hashMap));
        } else {
            this.adminManager.createTopicsAsync(hashMap2, request.timeout(), this.kafkaConfig.isDefaultTopicTypePartitioned()).thenApply(map -> {
                hashMap.putAll(map);
                completableFuture.complete(new CreateTopicsResponse(hashMap));
                return null;
            });
        }
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    protected void handleDescribeConfigs(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) {
        Preconditions.checkArgument(kafkaHeaderAndRequest.getRequest() instanceof DescribeConfigsRequest);
        DescribeConfigsRequest request = kafkaHeaderAndRequest.getRequest();
        this.adminManager.describeConfigsAsync((Map) new ArrayList(request.resources()).stream().collect(Collectors.toMap(configResource -> {
            return configResource;
        }, configResource2 -> {
            return Optional.ofNullable(request.configNames(configResource2)).map(HashSet::new);
        }))).thenApply(map -> {
            completableFuture.complete(new DescribeConfigsResponse(0, map));
            return null;
        });
    }

    private SaslHandshakeResponse checkSaslMechanism(String str) {
        return getKafkaConfig().getSaslAllowedMechanisms().contains(str) ? new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()) : new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, new HashSet());
    }

    @Override // io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.error("Caught error in handler, closing channel", th);
        channelHandlerContext.close();
    }

    private CompletableFuture<Optional<String>> getProtocolDataToAdvertise(InetSocketAddress inetSocketAddress, TopicName topicName) {
        CompletableFuture<Optional<String>> completableFuture = new CompletableFuture<>();
        if (inetSocketAddress == null) {
            log.error("[{}] failed get pulsar address, returned null.", topicName.toString());
            KafkaTopicManager kafkaTopicManager = this.topicManager;
            KafkaTopicManager.removeTopicManagerCache(topicName.toString());
            completableFuture.complete(Optional.empty());
            return completableFuture;
        }
        if (log.isDebugEnabled()) {
            log.debug("Found broker for topic {} pulsarAddress: {}", topicName, inetSocketAddress);
        }
        KafkaTopicManager kafkaTopicManager2 = this.topicManager;
        if (KafkaTopicManager.KOP_ADDRESS_CACHE.containsKey(topicName.toString())) {
            KafkaTopicManager kafkaTopicManager3 = this.topicManager;
            return KafkaTopicManager.KOP_ADDRESS_CACHE.get(topicName.toString());
        }
        ZooKeeperCache localZkCache = this.pulsarService.getLocalZkCache();
        localZkCache.getChildrenAsync("/loadbalance/brokers", localZkCache).whenComplete((set, th) -> {
            if (th != null) {
                log.error("Error in getChildrenAsync(zk://loadbalance) for {}", inetSocketAddress, th);
                completableFuture.complete(Optional.empty());
                return;
            }
            String str = inetSocketAddress.getHostName() + ":" + inetSocketAddress.getPort();
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                if (str2.startsWith(inetSocketAddress.getHostName() + ":")) {
                    newArrayList.add(str2);
                }
            }
            if (newArrayList.isEmpty()) {
                log.error("No node for broker {} under zk://loadbalance", inetSocketAddress);
                completableFuture.complete(Optional.empty());
            } else {
                List list = (List) newArrayList.stream().map(str3 -> {
                    return localZkCache.getDataAsync(String.format("%s/%s", "/loadbalance/brokers", str3), ((LoadManager) this.pulsarService.getLoadManager().get()).getLoadReportDeserializer());
                }).collect(Collectors.toList());
                FutureUtil.waitForAll(list).whenComplete((r14, th) -> {
                    if (th != null) {
                        log.error("Error in getDataAsync() for {}", inetSocketAddress, th);
                        completableFuture.complete(Optional.empty());
                        return;
                    }
                    try {
                        Iterator it2 = list.iterator();
                        while (it2.hasNext()) {
                            ServiceLookupData serviceLookupData = (ServiceLookupData) ((Optional) ((CompletableFuture) it2.next()).get()).get();
                            if (log.isDebugEnabled()) {
                                log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, pulsarUrlTls: {}, webUrl: {}, webUrlTls: {} kafka: {}", new Object[]{topicName, serviceLookupData.getPulsarServiceUrl(), serviceLookupData.getPulsarServiceUrlTls(), serviceLookupData.getWebServiceUrl(), serviceLookupData.getWebServiceUrlTls(), serviceLookupData.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME)});
                            }
                            if (lookupDataContainsAddress(serviceLookupData, str)) {
                                KafkaTopicManager kafkaTopicManager4 = this.topicManager;
                                KafkaTopicManager.KOP_ADDRESS_CACHE.put(topicName.toString(), completableFuture);
                                completableFuture.complete(serviceLookupData.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME));
                                return;
                            }
                        }
                        log.error("Not able to search {} in all child of zk://loadbalance", inetSocketAddress);
                        completableFuture.complete(Optional.empty());
                    } catch (Exception e) {
                        log.error("Error in {} lookupFuture get: ", inetSocketAddress, e);
                        completableFuture.complete(Optional.empty());
                    }
                });
            }
        });
        return completableFuture;
    }

    private CompletableFuture<MetadataResponse.PartitionMetadata> findBroker(TopicName topicName) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle Lookup for {}", this.ctx.channel(), topicName);
        }
        CompletableFuture<MetadataResponse.PartitionMetadata> completableFuture = new CompletableFuture<>();
        this.topicManager.getTopicBroker(topicName.toString()).thenCompose(inetSocketAddress -> {
            return getProtocolDataToAdvertise(inetSocketAddress, topicName);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (optional, th) -> {
            if (!optional.isPresent() || th != null) {
                log.error("Not get advertise data for Kafka topic:{}. throwable", topicName, th);
                completableFuture.complete(null);
                return;
            }
            String str = (String) optional.get();
            EndPoint sslEndPoint = this.tlsEnabled.booleanValue() ? EndPoint.getSslEndPoint(str) : EndPoint.getPlainTextEndPoint(str);
            Node newNode = newNode(sslEndPoint.getInetAddress());
            if (log.isDebugEnabled()) {
                log.debug("Found broker localListeners: {} for topicName: {}, localListeners: {}, found Listeners: {}", new Object[]{str, topicName, this.advertisedListeners, str});
            }
            if (!this.advertisedListeners.contains(sslEndPoint.getOriginalListener())) {
                KafkaTopicManager kafkaTopicManager = this.topicManager;
                KafkaTopicManager.removeTopicManagerCache(topicName.toString());
            }
            if (this.advertisedListeners.contains(sslEndPoint.getOriginalListener())) {
                this.topicManager.getTopic(topicName.toString()).whenComplete((persistentTopic, th) -> {
                    if (th == null && persistentTopic != null) {
                        if (log.isDebugEnabled()) {
                            log.debug("Add topic: {} into TopicManager while findBroker.", topicName.toString());
                        }
                        completableFuture.complete(newPartitionMetadata(topicName, newNode));
                    } else {
                        log.warn("[{}] findBroker: Failed to getOrCreateTopic {}. broker:{}, exception:", new Object[]{this.ctx.channel(), topicName.toString(), sslEndPoint.getOriginalListener(), th});
                        KafkaTopicManager kafkaTopicManager2 = this.topicManager;
                        KafkaTopicManager.removeTopicManagerCache(topicName.toString());
                        completableFuture.complete(null);
                    }
                });
            } else {
                completableFuture.complete(newPartitionMetadata(topicName, newNode));
            }
        });
        return completableFuture;
    }

    static Node newNode(InetSocketAddress inetSocketAddress) {
        if (log.isDebugEnabled()) {
            log.debug("Return Broker Node of {}. {}:{}", new Object[]{inetSocketAddress, inetSocketAddress.getHostString(), Integer.valueOf(inetSocketAddress.getPort())});
        }
        return new Node(Murmur3_32Hash.getInstance().makeHash((inetSocketAddress.getHostString() + inetSocketAddress.getPort()).getBytes(StandardCharsets.UTF_8)), inetSocketAddress.getHostString(), inetSocketAddress.getPort());
    }

    Node newSelfNode() {
        return newNode(this.advertisedEndPoint.getInetAddress());
    }

    static MetadataResponse.PartitionMetadata newPartitionMetadata(TopicName topicName, Node node) {
        int partitionIndex = topicName.getPartitionIndex();
        int i = partitionIndex == -1 ? 0 : partitionIndex;
        if (log.isDebugEnabled()) {
            log.debug("Return PartitionMetadata node: {}, topicName: {}", node, topicName);
        }
        return new MetadataResponse.PartitionMetadata(Errors.NONE, i, node, Lists.newArrayList(new Node[]{node}), Lists.newArrayList(new Node[]{node}), Collections.emptyList());
    }

    static MetadataResponse.PartitionMetadata newFailedPartitionMetadata(TopicName topicName) {
        int partitionIndex = topicName.getPartitionIndex();
        int i = partitionIndex == -1 ? 0 : partitionIndex;
        log.warn("Failed find Broker metadata, create PartitionMetadata with NOT_LEADER_FOR_PARTITION");
        return new MetadataResponse.PartitionMetadata(Errors.NOT_LEADER_FOR_PARTITION, i, Node.noNode(), Lists.newArrayList(new Node[]{Node.noNode()}), Lists.newArrayList(new Node[]{Node.noNode()}), Collections.emptyList());
    }

    static AbstractResponse failedResponse(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, Throwable th) {
        if (log.isDebugEnabled()) {
            log.debug("Request {} get failed response ", kafkaHeaderAndRequest.getHeader().apiKey(), th);
        }
        return kafkaHeaderAndRequest.getRequest().getErrorResponse(((Integer) CommonFields.THROTTLE_TIME_MS.defaultValue).intValue(), th);
    }

    static boolean lookupDataContainsAddress(ServiceLookupData serviceLookupData, String str) {
        return (serviceLookupData.getPulsarServiceUrl() != null && serviceLookupData.getPulsarServiceUrl().contains(str)) || (serviceLookupData.getPulsarServiceUrlTls() != null && serviceLookupData.getPulsarServiceUrlTls().contains(str)) || ((serviceLookupData.getWebServiceUrl() != null && serviceLookupData.getWebServiceUrl().contains(str)) || (serviceLookupData.getWebServiceUrlTls() != null && serviceLookupData.getWebServiceUrlTls().contains(str)));
    }

    public PulsarService getPulsarService() {
        return this.pulsarService;
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    public KafkaTopicManager getTopicManager() {
        return this.topicManager;
    }

    public GroupCoordinator getGroupCoordinator() {
        return this.groupCoordinator;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public PulsarAdmin getAdmin() {
        return this.admin;
    }

    public SaslAuthenticator getAuthenticator() {
        return this.authenticator;
    }

    public AdminManager getAdminManager() {
        return this.adminManager;
    }

    public Boolean getTlsEnabled() {
        return this.tlsEnabled;
    }

    public EndPoint getAdvertisedEndPoint() {
        return this.advertisedEndPoint;
    }

    public String getAdvertisedListeners() {
        return this.advertisedListeners;
    }

    public int getDefaultNumPartitions() {
        return this.defaultNumPartitions;
    }

    public int getMaxReadEntriesNum() {
        return this.maxReadEntriesNum;
    }

    public Map<TopicPartition, PendingProduceQueue> getPendingProduceQueueMap() {
        return this.pendingProduceQueueMap;
    }

    public EntryFormatter getEntryFormatter() {
        return this.entryFormatter;
    }
}
