/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import io.netty.buffer.ByteBuf;
import io.prometheus.client.Counter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LookupProxyHandler {
    private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
    private final ProxyService service;
    private final ProxyConnection proxyConnection;
    private final boolean connectWithTLS;
    private SocketAddress clientAddress;
    private String brokerServiceURL;
    private static final Counter lookupRequests = (Counter)Counter.build((String)"pulsar_proxy_lookup_requests", (String)"Counter of topic lookup requests").create().register();
    private static final Counter partitionsMetadataRequests = (Counter)Counter.build((String)"pulsar_proxy_partitions_metadata_requests", (String)"Counter of partitions metadata requests").create().register();
    private static final Counter getTopicsOfNamespaceRequestss = (Counter)Counter.build((String)"pulsar_proxy_get_topics_of_namespace_requests", (String)"Counter of getTopicsOfNamespace requests").create().register();
    private static final Counter getSchemaRequests = (Counter)Counter.build((String)"pulsar_proxy_get_schema_requests", (String)"Counter of schema requests").create().register();
    static final Counter rejectedLookupRequests = (Counter)Counter.build((String)"pulsar_proxy_rejected_lookup_requests", (String)"Counter of topic lookup requests rejected due to throttling").create().register();
    static final Counter rejectedPartitionsMetadataRequests = (Counter)Counter.build((String)"pulsar_proxy_rejected_partitions_metadata_requests", (String)"Counter of partitions metadata requests rejected due to throttling").create().register();
    static final Counter rejectedGetTopicsOfNamespaceRequests = (Counter)Counter.build((String)"pulsar_proxy_rejected_get_topics_of_namespace_requests", (String)"Counter of getTopicsOfNamespace requests rejected due to throttling").create().register();
    private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);

    public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
        this.service = proxy;
        this.proxyConnection = proxyConnection;
        this.clientAddress = proxyConnection.clientAddress();
        this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
        this.brokerServiceURL = this.connectWithTLS ? proxy.getConfiguration().getBrokerServiceURLTLS() : proxy.getConfiguration().getBrokerServiceURL();
    }

    public void handleLookup(CommandLookupTopic lookup) {
        if (log.isDebugEnabled()) {
            log.debug("Received Lookup from {}", (Object)this.clientAddress);
        }
        long clientRequestId = lookup.getRequestId();
        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
            String serviceUrl;
            lookupRequests.inc();
            String topic = lookup.getTopic();
            if (StringUtils.isBlank((CharSequence)this.brokerServiceURL)) {
                LoadManagerReport availableBroker = null;
                try {
                    availableBroker = this.service.getDiscoveryProvider().nextBroker();
                }
                catch (Exception e) {
                    log.warn("[{}] Failed to get next active broker {}", new Object[]{this.clientAddress, e.getMessage(), e});
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)e.getMessage(), (long)clientRequestId));
                    return;
                }
                serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
            } else {
                serviceUrl = this.connectWithTLS ? this.service.getConfiguration().getBrokerServiceURLTLS() : this.service.getConfiguration().getBrokerServiceURL();
            }
            this.performLookup(clientRequestId, topic, serviceUrl, false, 10);
            this.service.getLookupRequestSemaphore().release();
        } else {
            rejectedLookupRequests.inc();
            if (log.isDebugEnabled()) {
                log.debug("Lookup Request ID {} from {} rejected - {}.", new Object[]{clientRequestId, this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)"Too many concurrent lookup and partitionsMetadata requests", (long)clientRequestId));
        }
    }

    private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative, int numberOfRetries) {
        URI brokerURI;
        if (numberOfRetries == 0) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)"Reached max number of redirections", (long)clientRequestId));
            return;
        }
        try {
            brokerURI = new URI(brokerServiceUrl);
        }
        catch (URISyntaxException e) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.MetadataError, (String)e.getMessage(), (long)clientRequestId));
            return;
        }
        InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{addr, topic, clientRequestId});
        }
        ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.proxyConnection.newRequestId();
            ByteBuf command = Commands.newLookup((String)topic, (boolean)authoritative, (long)requestId);
            clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
                if (t != null) {
                    log.warn("[{}] Failed to lookup topic {}: {}", new Object[]{this.clientAddress, topic, t.getMessage()});
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)t.getMessage(), (long)clientRequestId));
                } else {
                    String brokerUrl;
                    String string = brokerUrl = this.connectWithTLS ? r.brokerUrlTls : r.brokerUrl;
                    if (r.redirect) {
                        this.performLookup(clientRequestId, topic, brokerUrl, r.authoritative, numberOfRetries - 1);
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}", new Object[]{addr, topic, clientRequestId, brokerUrl});
                        }
                        this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupResponse((String)brokerUrl, (String)brokerUrl, (boolean)true, (CommandLookupTopicResponse.LookupType)CommandLookupTopicResponse.LookupType.Connect, (long)clientRequestId, (boolean)true));
                    }
                }
                this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
            });
        })).exceptionally(ex -> {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
            return null;
        });
    }

    public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
        partitionsMetadataRequests.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup", (Object)this.clientAddress);
        }
        long clientRequestId = partitionMetadata.getRequestId();
        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
            this.handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
            this.service.getLookupRequestSemaphore().release();
        } else {
            rejectedPartitionsMetadataRequests.inc();
            if (log.isDebugEnabled()) {
                log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", new Object[]{clientRequestId, this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)ServerError.ServiceNotReady, (String)"Too many concurrent lookup and partitionsMetadata requests", (long)clientRequestId));
        }
    }

    private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) {
        TopicName topicName = TopicName.get((String)partitionMetadata.getTopic());
        if (StringUtils.isBlank((CharSequence)this.brokerServiceURL)) {
            ((CompletableFuture)this.service.getDiscoveryProvider().getPartitionedTopicMetadata(this.service, topicName, this.proxyConnection.clientAuthRole, this.proxyConnection.authenticationData).thenAccept(metadata -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{this.proxyConnection.clientAuthRole, topicName, metadata.partitions});
                }
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)metadata.partitions, (long)clientRequestId));
            })).exceptionally(ex -> {
                log.warn("[{}] Failed to get partitioned metadata for topic {} {}", new Object[]{this.clientAddress, topicName, ex.getMessage(), ex});
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)this.getServerError((Throwable)ex), (String)ex.getMessage(), (long)clientRequestId));
                return null;
            });
        } else {
            URI brokerURI;
            try {
                brokerURI = new URI(this.brokerServiceURL);
            }
            catch (URISyntaxException e) {
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)ServerError.MetadataError, (String)e.getMessage(), (long)clientRequestId));
                return;
            }
            InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
            if (log.isDebugEnabled()) {
                log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{addr, topicName.getPartitionedTopicName(), clientRequestId});
            }
            ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
                long requestId = this.proxyConnection.newRequestId();
                ByteBuf command = Commands.newPartitionMetadataRequest((String)topicName.toString(), (long)requestId);
                clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
                    if (t != null) {
                        log.warn("[{}] failed to get Partitioned metadata : {}", new Object[]{topicName.toString(), t.getMessage(), t});
                        this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)this.getServerError((Throwable)t), (String)t.getMessage(), (long)clientRequestId));
                    } else {
                        this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((int)r.partitions, (long)clientRequestId));
                    }
                    this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
                });
            })).exceptionally(ex -> {
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)ServerError.ServiceNotReady, (String)ex.getMessage(), (long)clientRequestId));
                return null;
            });
        }
    }

    public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        getTopicsOfNamespaceRequestss.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received GetTopicsOfNamespace", (Object)this.clientAddress);
        }
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        if (this.service.getLookupRequestSemaphore().tryAcquire()) {
            this.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
            this.service.getLookupRequestSemaphore().release();
        } else {
            rejectedGetTopicsOfNamespaceRequests.inc();
            if (log.isDebugEnabled()) {
                log.debug("GetTopicsOfNamespace Request ID {} from {} rejected - {}.", new Object[]{requestId, this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)requestId, (ServerError)ServerError.ServiceNotReady, (String)"Too many concurrent lookup and partitionsMetadata requests"));
        }
    }

    private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace, long clientRequestId) {
        String serviceUrl = this.getServiceUrl(clientRequestId);
        if (!StringUtils.isNotBlank((CharSequence)serviceUrl)) {
            return;
        }
        this.performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10, commandGetTopicsOfNamespace.getMode());
    }

    private void performGetTopicsOfNamespace(long clientRequestId, String namespaceName, String brokerServiceUrl, int numberOfRetries, CommandGetTopicsOfNamespace.Mode mode) {
        if (numberOfRetries == 0) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)"Reached max number of redirections"));
            return;
        }
        InetSocketAddress addr = this.getAddr(brokerServiceUrl, clientRequestId);
        if (addr == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for getting TopicsOfNamespace '{}' with clientReq Id '{}'", new Object[]{addr, namespaceName, clientRequestId});
        }
        ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.proxyConnection.newRequestId();
            ByteBuf command = Commands.newGetTopicsOfNamespaceRequest((String)namespaceName, (long)requestId, (CommandGetTopicsOfNamespace.Mode)mode);
            clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> {
                if (t != null) {
                    log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", new Object[]{this.clientAddress, namespaceName, t.getMessage()});
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)t.getMessage()));
                } else {
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newGetTopicsOfNamespaceResponse((List)r, (long)clientRequestId));
                }
            });
            this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
        })).exceptionally(ex -> {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)ex.getMessage()));
            return null;
        });
    }

    public void handleGetSchema(CommandGetSchema commandGetSchema) {
        getSchemaRequests.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received GetSchema {}", (Object)this.clientAddress, (Object)commandGetSchema);
        }
        long clientRequestId = commandGetSchema.getRequestId();
        String serviceUrl = this.getServiceUrl(clientRequestId);
        String topic = commandGetSchema.getTopic();
        if (!StringUtils.isNotBlank((CharSequence)serviceUrl)) {
            return;
        }
        InetSocketAddress addr = this.getAddr(serviceUrl, clientRequestId);
        if (addr == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'", new Object[]{addr, topic, clientRequestId});
        }
        ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.proxyConnection.newRequestId();
            byte[] schemaVersion = null;
            if (commandGetSchema.hasSchemaVersion()) {
                schemaVersion = commandGetSchema.getSchemaVersion();
            }
            ByteBuf command = Commands.newGetSchema((long)requestId, (String)topic, Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
            clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) -> {
                if (t != null) {
                    log.warn("[{}] Failed to get schema {}: {}", new Object[]{this.clientAddress, topic, t});
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)t.getMessage()));
                } else {
                    this.proxyConnection.ctx().writeAndFlush((Object)Commands.newGetSchemaResponse((long)clientRequestId, (CommandGetSchemaResponse)r));
                }
                this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
            });
        })).exceptionally(ex -> {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)ex.getMessage()));
            return null;
        });
    }

    private String getServiceUrl(long clientRequestId) {
        if (StringUtils.isBlank((CharSequence)this.brokerServiceURL)) {
            LoadManagerReport availableBroker;
            try {
                availableBroker = this.service.getDiscoveryProvider().nextBroker();
            }
            catch (Exception e) {
                log.warn("[{}] Failed to get next active broker {}", new Object[]{this.clientAddress, e.getMessage(), e});
                this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)e.getMessage()));
                return null;
            }
            return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
        }
        return this.connectWithTLS ? this.service.getConfiguration().getBrokerServiceURLTLS() : this.service.getConfiguration().getBrokerServiceURL();
    }

    private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
        URI brokerURI;
        try {
            brokerURI = new URI(brokerServiceUrl);
        }
        catch (URISyntaxException e) {
            this.proxyConnection.ctx().writeAndFlush((Object)Commands.newError((long)clientRequestId, (ServerError)ServerError.MetadataError, (String)e.getMessage()));
            return null;
        }
        return InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
    }

    private ServerError getServerError(Throwable error) {
        ServerError responseError = error instanceof PulsarClientException.AuthorizationException ? ServerError.AuthorizationError : (error instanceof PulsarClientException.AuthenticationException ? ServerError.AuthenticationError : ServerError.ServiceNotReady);
        return responseError;
    }
}

