package io.streamnative.pulsar.handlers.kop;

import com.google.common.collect.Lists;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.class */
public class KopBrokerLookupManager {
    private final PulsarService pulsarService;
    private final Boolean tlsEnabled;
    private final String advertisedListeners;
    private final LookupClient lookupClient;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KopBrokerLookupManager.class);
    public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> LOOKUP_CACHE = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>> KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();

    public KopBrokerLookupManager(PulsarService pulsarService, Boolean bool, String str) {
        this.pulsarService = pulsarService;
        this.tlsEnabled = bool;
        this.advertisedListeners = str;
        this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
    }

    public CompletableFuture<InetSocketAddress> findBroker(String str) {
        if (log.isDebugEnabled()) {
            log.debug("Handle Lookup for topic {}", str);
        }
        CompletableFuture<InetSocketAddress> completableFuture = new CompletableFuture<>();
        getTopicBroker(str).thenCompose(inetSocketAddress -> {
            return getProtocolDataToAdvertise(inetSocketAddress, TopicName.get(str));
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (optional, th) -> {
            if (!optional.isPresent() || th != null) {
                log.error("Not get advertise data for Kafka topic:{}. throwable", str, th);
                completableFuture.complete(null);
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("Found broker localListeners: {} for topicName: {}, localListeners: {}", optional.get(), str, this.advertisedListeners);
            }
            EndPoint sslEndPoint = this.tlsEnabled.booleanValue() ? EndPoint.getSslEndPoint((String) optional.get()) : EndPoint.getPlainTextEndPoint((String) optional.get());
            if (this.advertisedListeners.contains(sslEndPoint.getOriginalListener())) {
                checkTopicOwner(completableFuture, str, sslEndPoint);
            } else {
                removeTopicManagerCache(str);
                completableFuture.complete(sslEndPoint.getInetAddress());
            }
        });
        return completableFuture;
    }

    private CompletableFuture<InetSocketAddress> getTopicBroker(String str) {
        return LOOKUP_CACHE.computeIfAbsent(str, str2 -> {
            return this.lookupClient.getBrokerAddress(TopicName.get(str));
        });
    }

    private void checkTopicOwner(CompletableFuture<InetSocketAddress> completableFuture, String str, EndPoint endPoint) {
        getTopic(str).whenComplete((persistentTopic, th) -> {
            if (th != null || persistentTopic == null) {
                log.warn("findBroker: Failed to getOrCreateTopic {}. broker:{}, exception:", str, endPoint.getOriginalListener(), th);
                removeTopicManagerCache(str);
                completableFuture.complete(null);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Add topic: {} into TopicManager while findBroker.", str);
                }
                completableFuture.complete(endPoint.getInetAddress());
            }
        });
    }

    private CompletableFuture<PersistentTopic> getTopic(String str) {
        CompletableFuture<PersistentTopic> completableFuture = new CompletableFuture<>();
        this.pulsarService.getBrokerService().getTopic(str, this.pulsarService.getBrokerService().isAllowAutoTopicCreation(str)).whenComplete((optional, th) -> {
            if (th != null) {
                log.error("Failed to getTopic {}. exception:", str, th);
                removeTopicManagerCache(str);
                completableFuture.complete(null);
            } else if (optional.isPresent()) {
                completableFuture.complete((PersistentTopic) optional.get());
            } else {
                log.error("Get empty topic for name {}", str);
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    private CompletableFuture<Optional<String>> getProtocolDataToAdvertise(InetSocketAddress inetSocketAddress, TopicName topicName) {
        CompletableFuture<Optional<String>> completableFuture = new CompletableFuture<>();
        if (inetSocketAddress == null) {
            log.error("[{}] failed get pulsar address, returned null.", topicName.toString());
            removeTopicManagerCache(topicName.toString());
            completableFuture.complete(Optional.empty());
            return completableFuture;
        }
        if (log.isDebugEnabled()) {
            log.debug("Find broker for topic {} pulsarAddress: {}", topicName, inetSocketAddress);
        }
        if (KOP_ADDRESS_CACHE.containsKey(topicName.toString())) {
            return KOP_ADDRESS_CACHE.get(topicName.toString());
        }
        if (StringUtil.isBlank(this.lookupClient.getPulsarClient().getConfiguration().getListenerName())) {
            ZooKeeperCache localZkCache = this.pulsarService.getLocalZkCache();
            localZkCache.getChildrenAsync("/loadbalance/brokers", localZkCache).whenComplete((set, th) -> {
                if (th != null) {
                    log.error("Error in getChildrenAsync(zk://loadbalance) for {}", inetSocketAddress, th);
                    completableFuture.complete(Optional.empty());
                    return;
                }
                String str = inetSocketAddress.getHostName() + ParameterizedMessage.ERROR_MSG_SEPARATOR + inetSocketAddress.getPort();
                ArrayList newArrayList = Lists.newArrayList();
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    String str2 = (String) it.next();
                    if (str2.startsWith(inetSocketAddress.getHostName() + ParameterizedMessage.ERROR_MSG_SEPARATOR)) {
                        newArrayList.add(str2);
                    }
                }
                if (!newArrayList.isEmpty()) {
                    MetadataCache metadataCache = this.pulsarService.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
                    getKopAddress((List) newArrayList.stream().map(str3 -> {
                        return metadataCache.get(String.format("%s/%s", "/loadbalance/brokers", str3));
                    }).collect(Collectors.toList()), inetSocketAddress, completableFuture, topicName, str);
                } else {
                    log.error("No node for broker {} under zk://loadbalance", inetSocketAddress);
                    completableFuture.complete(Optional.empty());
                    removeTopicManagerCache(topicName.toString());
                }
            });
            return completableFuture;
        }
        String format = String.format("%s://%s:%s", SecurityProtocol.PLAINTEXT.name(), inetSocketAddress.getHostName(), Integer.valueOf(inetSocketAddress.getPort()));
        KOP_ADDRESS_CACHE.put(topicName.toString(), completableFuture);
        if (log.isDebugEnabled()) {
            log.debug("{} get kafka Advertised Address through kafkaListenerName: {}", topicName, inetSocketAddress);
        }
        completableFuture.complete(Optional.ofNullable(format));
        return completableFuture;
    }

    private void getKopAddress(List<CompletableFuture<Optional<LocalBrokerData>>> list, InetSocketAddress inetSocketAddress, CompletableFuture<Optional<String>> completableFuture, TopicName topicName, String str) {
        FutureUtil.waitForAll(list).whenComplete((r13, th) -> {
            if (th != null) {
                log.error("Error in getDataAsync() for {}", inetSocketAddress, th);
                completableFuture.complete(Optional.empty());
                removeTopicManagerCache(topicName.toString());
                return;
            }
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ServiceLookupData serviceLookupData = (ServiceLookupData) ((Optional) ((CompletableFuture) it.next()).get()).get();
                    if (log.isDebugEnabled()) {
                        log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, pulsarUrlTls: {}, webUrl: {}, webUrlTls: {} kafka: {}", topicName, serviceLookupData.getPulsarServiceUrl(), serviceLookupData.getPulsarServiceUrlTls(), serviceLookupData.getWebServiceUrl(), serviceLookupData.getWebServiceUrlTls(), serviceLookupData.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME));
                    }
                    if (KafkaRequestHandler.lookupDataContainsAddress(serviceLookupData, str)) {
                        KOP_ADDRESS_CACHE.put(topicName.toString(), completableFuture);
                        completableFuture.complete(serviceLookupData.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME));
                        return;
                    }
                }
                log.error("Not able to search {} in all child of zk://loadbalance", inetSocketAddress);
                completableFuture.complete(Optional.empty());
            } catch (Exception e) {
                log.error("Error in {} lookupFuture get: ", inetSocketAddress, e);
                completableFuture.complete(Optional.empty());
                removeTopicManagerCache(topicName.toString());
            }
        });
    }

    public static void removeTopicManagerCache(String str) {
        LOOKUP_CACHE.remove(str);
        KOP_ADDRESS_CACHE.remove(str);
    }

    public static void clear() {
        LOOKUP_CACHE.clear();
        KOP_ADDRESS_CACHE.clear();
    }
}
