package io.streamnative.pulsar.handlers.kop;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/LookupClient.class */
public class LookupClient implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LookupClient.class);
    private final NamespaceService namespaceService;
    private final PulsarClientImpl pulsarClient;

    public LookupClient(PulsarService pulsarService, KafkaServiceConfiguration kafkaServiceConfiguration) {
        this.namespaceService = pulsarService.getNamespaceService();
        try {
            this.pulsarClient = createPulsarClient(pulsarService, kafkaServiceConfiguration);
        } catch (PulsarClientException e) {
            log.error("Failed to create PulsarClient", e);
            throw new IllegalStateException(e);
        }
    }

    public LookupClient(PulsarService pulsarService) {
        log.warn("This constructor should not be called, it's only called when the PulsarService doesn't exist in KafkaProtocolHandlers.LOOKUP_CLIENT_UP");
        this.namespaceService = pulsarService.getNamespaceService();
        try {
            this.pulsarClient = pulsarService.getClient();
        } catch (PulsarServerException e) {
            log.error("Failed to create PulsarClient", e);
            throw new IllegalStateException(e);
        }
    }

    public CompletableFuture<InetSocketAddress> getBrokerAddress(TopicName topicName) {
        return this.namespaceService.getBrokerServiceUrlAsync(topicName, LookupOptions.builder().authoritative(false).advertisedListenerName(this.pulsarClient.getConfiguration().getListenerName()).loadTopicsInBundle(true).build()).thenCompose(optional -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Lookup result {}", topicName.toString(), optional);
            }
            if (!optional.isPresent()) {
                return getFailedAddressFuture(ClientCnx.getPulsarClientException(ServerError.ServiceNotReady, "No broker was available to own " + topicName));
            }
            LookupResult lookupResult = (LookupResult) optional.get();
            return lookupResult.isRedirect() ? this.pulsarClient.getLookup().getBroker(topicName).thenApply((v0) -> {
                return v0.getLeft();
            }) : getAddressFutureFromBrokerUrl(lookupResult.getLookupData().getBrokerUrl());
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.pulsarClient.close();
        } catch (PulsarClientException e) {
            log.warn("Failed to close PulsarClient of LookupClient", e);
        }
    }

    private static PulsarClientImpl createPulsarClient(PulsarService pulsarService, KafkaServiceConfiguration kafkaServiceConfiguration) throws PulsarClientException {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setServiceUrl(kafkaServiceConfiguration.isTlsEnabled() ? pulsarService.getBrokerServiceUrlTls() : pulsarService.getBrokerServiceUrl());
        clientConfigurationData.setTlsAllowInsecureConnection(kafkaServiceConfiguration.isTlsAllowInsecureConnection());
        clientConfigurationData.setTlsTrustCertsFilePath(kafkaServiceConfiguration.getTlsCertificateFilePath());
        if (kafkaServiceConfiguration.isBrokerClientTlsEnabled()) {
            if (kafkaServiceConfiguration.isBrokerClientTlsEnabledWithKeyStore()) {
                clientConfigurationData.setUseKeyStoreTls(true);
                clientConfigurationData.setTlsTrustStoreType(kafkaServiceConfiguration.getBrokerClientTlsTrustStoreType());
                clientConfigurationData.setTlsTrustStorePath(kafkaServiceConfiguration.getBrokerClientTlsTrustStore());
                clientConfigurationData.setTlsTrustStorePassword(kafkaServiceConfiguration.getBrokerClientTlsTrustStorePassword());
            } else {
                clientConfigurationData.setTlsTrustCertsFilePath(StringUtils.isNotBlank(kafkaServiceConfiguration.getBrokerClientTrustCertsFilePath()) ? kafkaServiceConfiguration.getBrokerClientTrustCertsFilePath() : kafkaServiceConfiguration.getTlsCertificateFilePath());
            }
        }
        if (StringUtils.isNotBlank(kafkaServiceConfiguration.getBrokerClientAuthenticationPlugin())) {
            clientConfigurationData.setAuthPluginClassName(kafkaServiceConfiguration.getBrokerClientAuthenticationPlugin());
            clientConfigurationData.setAuthParams(kafkaServiceConfiguration.getBrokerClientAuthenticationParameters());
            clientConfigurationData.setAuthParamMap((Map) null);
            clientConfigurationData.setAuthentication(AuthenticationFactory.create(kafkaServiceConfiguration.getBrokerClientAuthenticationPlugin(), kafkaServiceConfiguration.getBrokerClientAuthenticationParameters()));
        }
        clientConfigurationData.setListenerName(kafkaServiceConfiguration.getKafkaListenerName());
        return new PulsarClientImpl(clientConfigurationData, pulsarService.getIoEventLoopGroup());
    }

    private static CompletableFuture<InetSocketAddress> getFailedAddressFuture(Throwable th) {
        CompletableFuture<InetSocketAddress> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(th);
        return completableFuture;
    }

    private static CompletableFuture<InetSocketAddress> getAddressFutureFromBrokerUrl(String str) {
        CompletableFuture<InetSocketAddress> completableFuture = new CompletableFuture<>();
        try {
            URI uri = new URI(str);
            completableFuture.complete(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
        } catch (URISyntaxException e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    public PulsarClientImpl getPulsarClient() {
        return this.pulsarClient;
    }
}
