/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BrokerServiceTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(BrokerServiceTest.class);
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.resetConfig();
    }

    private void resetState() throws Exception {
        this.cleanup();
        this.setup();
    }

    @Test
    public void testOwnedNsCheck() throws Exception {
        String topic = "persistent://prop/ns-abc/successTopic";
        BrokerService service = this.pulsar.getBrokerService();
        CountDownLatch latch1 = new CountDownLatch(1);
        ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(t -> {
            latch1.countDown();
            Assert.fail((String)"should fail as NS is not owned");
        })).exceptionally(exception -> {
            Assert.assertTrue((boolean)(exception.getCause() instanceof IOException));
            latch1.countDown();
            return null;
        });
        latch1.await();
        this.admin.lookups().lookupTopic("persistent://prop/ns-abc/successTopic");
        CountDownLatch latch2 = new CountDownLatch(1);
        ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/successTopic").thenAccept(t -> {
            try {
                Assert.assertNotNull((Object)service.getTopicReference("persistent://prop/ns-abc/successTopic"));
            }
            catch (Exception e) {
                Assert.fail((String)"should not fail");
            }
            latch2.countDown();
        })).exceptionally(exception -> {
            latch2.countDown();
            Assert.fail((String)"should not fail");
            return null;
        });
        latch2.await();
    }

    @Test
    public void testBrokerServicePersistentTopicStats() throws Exception {
        this.resetState();
        String topicName = "persistent://prop/ns-abc/successTopic";
        String subName = "successSub";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successTopic"}).subscriptionName("successSub").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successTopic").get();
        Assert.assertNotNull((Object)topicRef);
        this.rolloverPerIntervalStats();
        TopicStatsImpl stats = topicRef.getStats(false, false);
        SubscriptionStats subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals((int)stats.getSubscriptions().keySet().size(), (int)1);
        Assert.assertEquals((long)subStats.getMsgBacklog(), (long)0L);
        Assert.assertEquals((int)subStats.getConsumers().size(), (int)1);
        Assert.assertEquals((long)stats.getOffloadedStorageSize(), (long)0L);
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats(false, false);
        subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals((long)subStats.getMsgBacklog(), (long)10L);
        Assert.assertEquals((int)stats.getPublishers().size(), (int)1);
        Assert.assertTrue((((PublisherStats)stats.getPublishers().get(0)).getMsgRateIn() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.getPublishers().get(0)).getMsgThroughputIn() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.getPublishers().get(0)).getAverageMsgSize() > 0.0 ? 1 : 0) != 0);
        Assert.assertNotNull((Object)((PublisherStats)stats.getPublishers().get(0)).getClientVersion());
        Assert.assertEquals((double)stats.getMsgRateIn(), (double)((PublisherStats)stats.getPublishers().get(0)).getMsgRateIn());
        Assert.assertEquals((double)stats.getMsgThroughputIn(), (double)((PublisherStats)stats.getPublishers().get(0)).getMsgThroughputIn());
        double diff = stats.getAverageMsgSize() - ((PublisherStats)stats.getPublishers().get(0)).getAverageMsgSize();
        Assert.assertTrue((Math.abs(diff) < 1.0E-6 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateOut() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.getConsumers().get(0)).getMsgThroughputOut() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((double)subStats.getMsgRateOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals((double)subStats.getMsgThroughputOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertEquals((double)stats.getMsgRateOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals((double)stats.getMsgThroughputOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertNotNull((Object)((ConsumerStats)subStats.getConsumers().get(0)).getClientVersion());
        Assert.assertEquals((long)stats.getOffloadedStorageSize(), (long)0L);
        for (int i = 0; i < 10; ++i) {
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        consumer.close();
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats(false, false);
        subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals((long)stats.getOffloadedStorageSize(), (long)0L);
        Assert.assertEquals((long)subStats.getMsgBacklog(), (long)0L);
    }

    @Test
    public void testConnectionController() throws Exception {
        this.cleanup();
        this.conf.setBrokerMaxConnections(3);
        this.conf.setBrokerMaxConnectionsPerIp(2);
        this.setup();
        String topicName = "persistent://prop/ns-abc/connection" + UUID.randomUUID();
        ArrayList<PulsarClient> clients = new ArrayList<PulsarClient>();
        ClientBuilder clientBuilder = PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS).connectionTimeout(1, TimeUnit.DAYS).serviceUrl(this.brokerUrl.toString());
        long startTime = System.currentTimeMillis();
        clients.add(this.createNewConnection(topicName, clientBuilder));
        clients.add(this.createNewConnection(topicName, clientBuilder));
        this.createNewConnectionAndCheckFail(topicName, clientBuilder);
        Assert.assertTrue((System.currentTimeMillis() - startTime < 20000L ? 1 : 0) != 0);
        this.cleanClient(clients);
        clients.clear();
        this.cleanup();
        this.conf.setBrokerMaxConnections(2);
        this.conf.setBrokerMaxConnectionsPerIp(3);
        this.setup();
        startTime = System.currentTimeMillis();
        clientBuilder.serviceUrl(this.brokerUrl.toString());
        clients.add(this.createNewConnection(topicName, clientBuilder));
        clients.add(this.createNewConnection(topicName, clientBuilder));
        this.createNewConnectionAndCheckFail(topicName, clientBuilder);
        Assert.assertTrue((System.currentTimeMillis() - startTime < 20000L ? 1 : 0) != 0);
        this.cleanClient(clients);
        clients.clear();
    }

    @Test
    public void testConnectionController2() throws Exception {
        this.cleanup();
        this.conf.setBrokerMaxConnections(0);
        this.conf.setBrokerMaxConnectionsPerIp(1);
        this.setup();
        String topicName = "persistent://prop/ns-abc/connection" + UUID.randomUUID();
        ArrayList<PulsarClient> clients = new ArrayList<PulsarClient>();
        ClientBuilder clientBuilder = PulsarClient.builder().operationTimeout(1, TimeUnit.DAYS).connectionTimeout(1, TimeUnit.DAYS).serviceUrl(this.brokerUrl.toString());
        long startTime = System.currentTimeMillis();
        clients.add(this.createNewConnection(topicName, clientBuilder));
        this.createNewConnectionAndCheckFail(topicName, clientBuilder);
        Assert.assertTrue((System.currentTimeMillis() - startTime < 20000L ? 1 : 0) != 0);
        this.cleanClient(clients);
        clients.clear();
        this.cleanup();
        this.conf.setBrokerMaxConnections(1);
        this.conf.setBrokerMaxConnectionsPerIp(0);
        this.setup();
        startTime = System.currentTimeMillis();
        clientBuilder.serviceUrl(this.brokerUrl.toString());
        clients.add(this.createNewConnection(topicName, clientBuilder));
        this.createNewConnectionAndCheckFail(topicName, clientBuilder);
        Assert.assertTrue((System.currentTimeMillis() - startTime < 20000L ? 1 : 0) != 0);
        this.cleanClient(clients);
        clients.clear();
        this.cleanup();
        this.conf.setBrokerMaxConnections(1);
        this.conf.setBrokerMaxConnectionsPerIp(1);
        this.setup();
        startTime = System.currentTimeMillis();
        clientBuilder.serviceUrl(this.brokerUrl.toString());
        clients.add(this.createNewConnection(topicName, clientBuilder));
        this.createNewConnectionAndCheckFail(topicName, clientBuilder);
        Assert.assertTrue((System.currentTimeMillis() - startTime < 20000L ? 1 : 0) != 0);
        this.cleanClient(clients);
        clients.clear();
        this.cleanup();
        this.conf.setBrokerMaxConnections(0);
        this.conf.setBrokerMaxConnectionsPerIp(0);
        this.setup();
        clientBuilder.serviceUrl(this.brokerUrl.toString());
        startTime = System.currentTimeMillis();
        for (int i = 0; i < 10; ++i) {
            clients.add(this.createNewConnection(topicName, clientBuilder));
        }
        Assert.assertTrue((System.currentTimeMillis() - startTime < 20000L ? 1 : 0) != 0);
        this.cleanClient(clients);
        clients.clear();
    }

    private void createNewConnectionAndCheckFail(String topicName, ClientBuilder builder) throws Exception {
        try {
            this.createNewConnection(topicName, builder);
            Assert.fail((String)"should fail");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Reached the maximum number of connections"));
        }
    }

    private PulsarClient createNewConnection(String topicName, ClientBuilder clientBuilder) throws PulsarClientException {
        PulsarClient client1 = clientBuilder.build();
        client1.newProducer().topic(topicName).create().close();
        return client1;
    }

    private void cleanClient(List<PulsarClient> clients) throws Exception {
        for (PulsarClient client : clients) {
            if (client == null) continue;
            client.close();
        }
    }

    @Test
    public void testStatsOfStorageSizeWithSubscription() throws Exception {
        String topicName = "persistent://prop/ns-abc/no-subscription";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/no-subscription").create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/no-subscription").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getStats((boolean)false, (boolean)false).storageSize, (long)0L);
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)new byte[10]);
        }
        Assert.assertTrue((topicRef.getStats((boolean)false, (boolean)false).storageSize > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
        this.resetState();
        String topicName = "persistent://prop/ns-abc/successSharedTopic";
        String subName = "successSharedSub";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/successSharedTopic"}).subscriptionName("successSharedSub").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Thread.sleep(100L);
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/successSharedTopic").get();
        Assert.assertNotNull((Object)topicRef);
        this.rolloverPerIntervalStats();
        TopicStatsImpl stats = topicRef.getStats(false, false);
        SubscriptionStats subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals((int)stats.getSubscriptions().keySet().size(), (int)1);
        Assert.assertEquals((long)subStats.getMsgBacklog(), (long)0L);
        Assert.assertEquals((int)subStats.getConsumers().size(), (int)1);
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/successSharedTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats(false, false);
        subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals((long)subStats.getMsgBacklog(), (long)10L);
        Assert.assertEquals((int)stats.getPublishers().size(), (int)1);
        Assert.assertTrue((((PublisherStats)stats.getPublishers().get(0)).getMsgRateIn() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.getPublishers().get(0)).getMsgThroughputIn() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((PublisherStats)stats.getPublishers().get(0)).getAverageMsgSize() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((double)stats.getMsgRateIn(), (double)((PublisherStats)stats.getPublishers().get(0)).getMsgRateIn());
        Assert.assertEquals((double)stats.getMsgThroughputIn(), (double)((PublisherStats)stats.getPublishers().get(0)).getMsgThroughputIn());
        double diff = stats.getAverageMsgSize() - ((PublisherStats)stats.getPublishers().get(0)).getAverageMsgSize();
        Assert.assertTrue((Math.abs(diff) < 1.0E-6 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateOut() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((((ConsumerStats)subStats.getConsumers().get(0)).getMsgThroughputOut() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((double)subStats.getMsgRateRedeliver(), (double)0.0);
        Assert.assertEquals((int)((ConsumerStats)subStats.getConsumers().get(0)).getUnackedMessages(), (int)10);
        Assert.assertEquals((double)subStats.getMsgRateOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals((double)subStats.getMsgThroughputOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertEquals((double)subStats.getMsgRateRedeliver(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateRedeliver());
        Assert.assertEquals((double)stats.getMsgRateOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateOut());
        Assert.assertEquals((double)stats.getMsgThroughputOut(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgThroughputOut());
        Assert.assertEquals((double)subStats.getMsgRateRedeliver(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateRedeliver());
        Assert.assertEquals((long)subStats.getUnackedMessages(), (long)((ConsumerStats)subStats.getConsumers().get(0)).getUnackedMessages());
        consumer.redeliverUnacknowledgedMessages();
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats(false, false);
        subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertTrue((subStats.getMsgRateRedeliver() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((double)subStats.getMsgRateRedeliver(), (double)((ConsumerStats)subStats.getConsumers().get(0)).getMsgRateRedeliver());
        for (int i = 0; i < 10; ++i) {
            Message msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        consumer.close();
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        stats = topicRef.getStats(false, false);
        subStats = (SubscriptionStats)stats.getSubscriptions().values().iterator().next();
        Assert.assertEquals((long)subStats.getMsgBacklog(), (long)0L);
    }

    @Test
    public void testBrokerStatsMetrics() throws Exception {
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        BrokerStats brokerStatsClient = this.admin.brokerStats();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
        Thread.sleep(100L);
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/newTopic").create();
        Thread.sleep(100L);
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Thread.sleep(100L);
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive();
            consumer.acknowledge(msg);
        }
        consumer.close();
        Thread.sleep(100L);
        String json = brokerStatsClient.getMetrics();
        JsonArray metrics = (JsonArray)new Gson().fromJson(json, JsonArray.class);
        boolean namespaceDimensionFound = false;
        boolean topicLoadTimesDimensionFound = false;
        for (int i = 0; i < metrics.size(); ++i) {
            try {
                String data = metrics.get(i).getAsJsonObject().get("dimensions").toString();
                if (!namespaceDimensionFound && data.contains("prop/ns-abc")) {
                    namespaceDimensionFound = true;
                }
                if (topicLoadTimesDimensionFound || !data.contains("prop/ns-abc")) continue;
                topicLoadTimesDimensionFound = true;
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        Assert.assertTrue((namespaceDimensionFound && topicLoadTimesDimensionFound ? 1 : 0) != 0);
        Thread.sleep(100L);
    }

    @Test
    public void testBrokerServiceNamespaceStats() throws Exception {
        this.resetState();
        int numBundles = 4;
        String ns1 = "prop/stats1";
        String ns2 = "prop/stats2";
        ArrayList nsList = Lists.newArrayList((Object[])new String[]{"prop/stats1", "prop/stats2"});
        ArrayList producerList = Lists.newArrayList();
        BrokerStats brokerStatsClient = this.admin.brokerStats();
        for (String ns : nsList) {
            this.admin.namespaces().createNamespace(ns, 4);
            this.admin.namespaces().setNamespaceReplicationClusters(ns, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
            String topic1 = String.format("persistent://%s/topic1", ns);
            producerList.add(this.pulsarClient.newProducer().topic(topic1).create());
            String topic2 = String.format("persistent://%s/topic2", ns);
            producerList.add(this.pulsarClient.newProducer().topic(topic2).create());
        }
        this.rolloverPerIntervalStats();
        String json = brokerStatsClient.getTopics();
        JsonObject topicStats = (JsonObject)new Gson().fromJson(json, JsonObject.class);
        Assert.assertEquals((int)topicStats.size(), (int)2, (String)topicStats.toString());
        for (String ns : nsList) {
            JsonObject nsObject = topicStats.getAsJsonObject(ns);
            List topicList = this.admin.namespaces().getTopics(ns);
            for (String topic : topicList) {
                NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)topic));
                JsonObject bundleObject = nsObject.getAsJsonObject(bundle.getBundleRange());
                JsonObject topicObject = bundleObject.getAsJsonObject("persistent");
                AtomicBoolean topicPresent = new AtomicBoolean();
                topicObject.entrySet().iterator().forEachRemaining(persistentTopic -> {
                    if (((String)persistentTopic.getKey()).equals(topic)) {
                        topicPresent.set(true);
                    }
                });
                Assert.assertTrue((boolean)topicPresent.get());
            }
        }
        for (org.apache.pulsar.client.api.Producer producer : producerList) {
            producer.close();
        }
        for (String ns : nsList) {
            List topics = this.admin.namespaces().getTopics(ns);
            for (String dest : topics) {
                this.admin.topics().delete(dest);
            }
            this.admin.namespaces().deleteNamespace(ns);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsDisabled() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        PulsarClient pulsarClient = null;
        this.conf.setAuthenticationEnabled(false);
        this.restartBroker();
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"TLS connection should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("ConnectException"));
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsEnabled() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        this.conf.setAuthenticationEnabled(false);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setNumExecutorThreadPoolSize(5);
        this.restartBroker();
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("General OpenSslEngine problem"));
        }
        finally {
            pulsarClient.close();
        }
        try {
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(false).tlsTrustCertsFilePath("./src/test/resources/certificate/server.crt").statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsEnabledWithoutNonTlsServicePorts() throws Exception {
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        this.conf.setAuthenticationEnabled(false);
        this.conf.setBrokerServicePort(Optional.empty());
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePort(Optional.empty());
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setNumExecutorThreadPoolSize(5);
        this.restartBroker();
        try {
            this.pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            this.pulsarClient.close();
        }
        this.resetState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsAuthAllowInsecure() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        HashSet<String> providers = new HashSet<String>();
        providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(providers);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(true);
        this.conf.setNumExecutorThreadPoolSize(5);
        this.restartBroker();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        authParams.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        try {
            AuthenticationTls auth = new AuthenticationTls();
            auth.configure(authParams);
            pulsarClient = PulsarClient.builder().authentication((Authentication)auth).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsAuthDisallowInsecure() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/my-ns/newTopic";
        String subName = "newSub";
        HashSet<String> providers = new HashSet<String>();
        providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(providers);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        this.conf.setNumExecutorThreadPoolSize(5);
        this.restartBroker();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        authParams.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        try {
            AuthenticationTls auth = new AuthenticationTls();
            auth.configure(authParams);
            pulsarClient = PulsarClient.builder().authentication((Authentication)auth).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/my-ns/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Unauthorized"));
        }
        finally {
            pulsarClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTlsAuthUseTrustCert() throws Exception {
        Consumer consumer;
        String topicName = "persistent://prop/ns-abc/newTopic";
        String subName = "newSub";
        HashSet<String> providers = new HashSet<String>();
        providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthenticationProviders(providers);
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsCertificateFilePath("./src/test/resources/certificate/server.crt");
        this.conf.setTlsKeyFilePath("./src/test/resources/certificate/server.key");
        this.conf.setTlsAllowInsecureConnection(false);
        this.conf.setTlsTrustCertsFilePath("./src/test/resources/certificate/client.crt");
        this.conf.setNumExecutorThreadPoolSize(5);
        this.restartBroker();
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/certificate/client.crt");
        authParams.put("tlsKeyFile", "./src/test/resources/certificate/client.key");
        try (PulsarClient pulsarClient = null;){
            pulsarClient = PulsarClient.builder().serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            try {
                Assert.fail((String)"should fail");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        try {
            AuthenticationTls auth = new AuthenticationTls();
            auth.configure(authParams);
            pulsarClient = PulsarClient.builder().authentication((Authentication)auth).serviceUrl(this.brokerUrlTls.toString()).enableTls(true).allowTlsInsecureConnection(true).statsInterval(0L, TimeUnit.SECONDS).operationTimeout(1000, TimeUnit.MILLISECONDS).build();
            consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("newSub").subscribe();
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            Assert.fail((String)"should not fail");
        }
        finally {
            pulsarClient.close();
        }
    }

    @Test
    public void testLookupThrottlingForClientByClient() throws Exception {
        String topicName = "persistent://prop/ns-abc/newTopic";
        PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
        resolver.updateServiceUrl(this.pulsar.getBrokerServiceUrl());
        ClientConfigurationData conf = new ClientConfigurationData();
        conf.setConcurrentLookupRequest(1);
        conf.setMaxLookupRequest(2);
        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup((int)20, (boolean)false, (ThreadFactory)new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
        long reqId = -559038737L;
        try (ConnectionPool pool = new ConnectionPool(conf, eventLoop);){
            CompletionStage f3;
            CompletionStage f2;
            CompletionStage f1;
            block19: {
                long reqId1 = reqId++;
                ByteBuf request1 = Commands.newPartitionMetadataRequest((String)"persistent://prop/ns-abc/newTopic", (long)reqId1);
                f1 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1));
                long reqId2 = reqId++;
                ByteBuf request2 = Commands.newPartitionMetadataRequest((String)"persistent://prop/ns-abc/newTopic", (long)reqId2);
                f2 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request2, reqId2));
                ((CompletableFuture)f1).get();
                ((CompletableFuture)f2).get();
                long reqId3 = reqId++;
                ByteBuf request3 = Commands.newPartitionMetadataRequest((String)"persistent://prop/ns-abc/newTopic", (long)reqId3);
                f1 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3));
                long reqId4 = reqId++;
                ByteBuf request4 = Commands.newPartitionMetadataRequest((String)"persistent://prop/ns-abc/newTopic", (long)reqId4);
                f2 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4));
                long reqId5 = reqId++;
                ByteBuf request5 = Commands.newPartitionMetadataRequest((String)"persistent://prop/ns-abc/newTopic", (long)reqId5);
                f3 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request5, reqId5));
                try {
                    ((CompletableFuture)f1).get();
                    ((CompletableFuture)f2).get();
                    ((CompletableFuture)f3).get();
                    Assert.fail((String)"At least one should fail");
                }
                catch (ExecutionException e) {
                    Throwable rootCause = e;
                    while (rootCause instanceof ExecutionException) {
                        rootCause = rootCause.getCause();
                    }
                    if (rootCause instanceof PulsarClientException.TooManyRequestsException) break block19;
                    throw e;
                }
            }
            long reqId6 = reqId++;
            ByteBuf request6 = Commands.newLookup((String)"persistent://prop/ns-abc/newTopic", (boolean)true, (long)reqId6);
            f1 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request6, reqId6));
            long reqId7 = reqId++;
            ByteBuf request7 = Commands.newLookup((String)"persistent://prop/ns-abc/newTopic", (boolean)true, (long)reqId7);
            f2 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request7, reqId7));
            ((CompletableFuture)f1).get();
            ((CompletableFuture)f2).get();
            long reqId8 = reqId++;
            ByteBuf request8 = Commands.newLookup((String)"persistent://prop/ns-abc/newTopic", (boolean)true, (long)reqId8);
            f1 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request8, reqId8));
            long reqId9 = reqId++;
            ByteBuf request9 = Commands.newLookup((String)"persistent://prop/ns-abc/newTopic", (boolean)true, (long)reqId9);
            f2 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request9, reqId9));
            long reqId10 = reqId++;
            ByteBuf request10 = Commands.newLookup((String)"persistent://prop/ns-abc/newTopic", (boolean)true, (long)reqId10);
            f3 = pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request10, reqId10));
            try {
                ((CompletableFuture)f1).get();
                ((CompletableFuture)f2).get();
                ((CompletableFuture)f3).get();
                Assert.fail((String)"At least one should fail");
            }
            catch (ExecutionException e) {
                Throwable rootCause = e;
                while (rootCause instanceof ExecutionException) {
                    rootCause = rootCause.getCause();
                }
                if (!(rootCause instanceof PulsarClientException.TooManyRequestsException)) {
                    throw e;
                }
            }
        }
    }

    @Test
    public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
        block4: {
            String namespace = "prop/disableBundle";
            try {
                this.admin.namespaces().createNamespace("prop/disableBundle");
            }
            catch (PulsarAdminException.ConflictException conflictException) {
                // empty catch block
            }
            this.admin.namespaces().setNamespaceReplicationClusters("prop/disableBundle", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
            String topicName = "persistent://prop/disableBundle/my-topic";
            TopicName topic = TopicName.get((String)"persistent://prop/disableBundle/my-topic");
            org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/disableBundle/my-topic").create();
            producer.close();
            NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(topic);
            this.pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, false).join();
            CompletableFuture futureResult = this.pulsar.getBrokerService().loadOrCreatePersistentTopic("persistent://prop/disableBundle/my-topic", true);
            try {
                futureResult.get();
                Assert.fail((String)"Topic creation should fail due to disable bundle");
            }
            catch (Exception e) {
                if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException) break block4;
                Assert.fail((String)"Topic creation should fail with ServiceUnitNotReadyException");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=3000L)
    public void testTopicFailureShouldNotHaveDeadLock() {
        String namespace = "prop/ns-abc";
        String deadLockTestTopic = "persistent://prop/ns-abc/deadLockTestTopic";
        try {
            String successfulTopic = "persistent://prop/ns-abc/ownBundleTopic";
            org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create();
            producer.close();
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            BrokerService service = (BrokerService)Mockito.spy((Object)this.pulsar.getBrokerService());
            CompletableFuture failedManagedLedgerConfig = new CompletableFuture();
            failedManagedLedgerConfig.completeExceptionally(new NullPointerException("failed to peristent policy"));
            ((BrokerService)Mockito.doReturn(failedManagedLedgerConfig).when((Object)service)).getManagedLedgerConfig((TopicName)Mockito.any());
            CompletableFuture topicCreation = new CompletableFuture();
            executor.submit(() -> ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> topicCreation.complete(null))).exceptionally(e -> {
                topicCreation.completeExceptionally(e.getCause());
                return null;
            }));
            try {
                topicCreation.get(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException | TimeoutException e) {
                Assert.fail((String)"there is a dead-lock and it should have been prevented");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof NullPointerException));
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
        String namespace = "prop/ns-abc";
        String deadLockTestTopic = "persistent://prop/ns-abc/deadLockTestTopic";
        try {
            String successfulTopic = "persistent://prop/ns-abc/ownBundleTopic";
            org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/ownBundleTopic").create();
            producer.close();
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            BrokerService service = (BrokerService)Mockito.spy((Object)this.pulsar.getBrokerService());
            CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<ManagedLedgerConfig>();
            failedManagedLedgerConfig.complete(new ManagedLedgerConfig());
            ((BrokerService)Mockito.doReturn(failedManagedLedgerConfig).when((Object)service)).getManagedLedgerConfig((TopicName)Mockito.any());
            CompletableFuture topicCreation = new CompletableFuture();
            Field ledgerField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
            ledgerField.setAccessible(true);
            ConcurrentHashMap ledgers = (ConcurrentHashMap)ledgerField.get(this.pulsar.getManagedLedgerFactory());
            CompletableFuture future = new CompletableFuture();
            future.completeExceptionally((Throwable)new ManagedLedgerException("ledger opening failed"));
            ledgers.put("prop/ns-abc/persistent/deadLockTestTopic", future);
            executor.submit(() -> ((CompletableFuture)service.getOrCreateTopic("persistent://prop/ns-abc/deadLockTestTopic").thenAccept(topic -> topicCreation.complete(null))).exceptionally(e -> {
                topicCreation.completeExceptionally(e.getCause());
                return null;
            }));
            try {
                topicCreation.get(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException | TimeoutException e) {
                Assert.fail((String)"there is a dead-lock and it should have been prevented");
            }
            catch (ExecutionException e) {
                Assert.assertEquals(e.getCause().getClass(), BrokerServiceException.PersistenceException.class);
            }
            finally {
                ledgers.clear();
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testCreateNamespacePolicy() throws Exception {
        String namespace = "prop/testPolicy";
        int totalBundle = 3;
        System.err.println("----------------");
        this.admin.namespaces().createNamespace("prop/testPolicy", BundlesData.builder().numBundles(3).build());
        this.admin.topics().createNonPartitionedTopic("prop/testPolicy/test");
        Optional policy = this.pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(NamespaceName.get((String)"prop/testPolicy"));
        Assert.assertTrue((boolean)policy.isPresent());
        Assert.assertEquals((int)((LocalPolicies)policy.get()).bundles.getNumBundles(), (int)3);
    }

    @Test
    public void testStuckTopicUnloading() throws Exception {
        String namespace = "prop/ns-abc";
        String topicName = "persistent://prop/ns-abc/unoadTopic";
        String topicMlName = "prop/ns-abc/persistent/unoadTopic";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/unoadTopic"}).subscriptionName("my-subscriber-name").subscribe();
        consumer.close();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/unoadTopic").sendTimeout(5, TimeUnit.SECONDS);
        org.apache.pulsar.client.api.Producer producer = producerBuilder.create();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/unoadTopic").get();
        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl)this.pulsar.getManagedLedgerClientFactory().getManagedLedgerFactory();
        Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        ledgersField.setAccessible(true);
        ConcurrentHashMap ledgers = (ConcurrentHashMap)ledgersField.get(mlFactory);
        Assert.assertNotNull(ledgers.get("prop/ns-abc/persistent/unoadTopic"));
        Producer prod = (Producer)Mockito.spy((Object)topic.producers.values().toArray()[0]);
        topic.producers.clear();
        topic.producers.put(prod.getProducerName(), prod);
        CompletableFuture waitFuture = new CompletableFuture();
        ((Producer)Mockito.doReturn(waitFuture).when((Object)prod)).disconnect();
        Set bundles = this.pulsar.getNamespaceService().getOwnedServiceUnits();
        for (NamespaceBundle bundle : bundles) {
            String ns = bundle.getNamespaceObject().toString();
            System.out.println();
            if (!"prop/ns-abc".equals(ns)) continue;
            this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 2L, TimeUnit.SECONDS);
        }
        Assert.assertNull(ledgers.get("prop/ns-abc/persistent/unoadTopic"));
    }

    @Test
    public void testMetricsProvider() throws IOException {
        String str;
        PrometheusRawMetricsProvider rawMetricsProvider = stream -> stream.write("test_metrics{label1=\"xyz\"} 10 \n");
        this.getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
        String metricsEndPoint = this.getPulsar().getWebServiceAddress() + "/metrics";
        HttpResponse response = httpClient.execute((HttpUriRequest)new HttpGet(metricsEndPoint));
        InputStream inputStream = response.getEntity().getContent();
        InputStreamReader isReader = new InputStreamReader(inputStream);
        BufferedReader reader = new BufferedReader(isReader);
        StringBuffer sb = new StringBuffer();
        while ((str = reader.readLine()) != null) {
            sb.append(str);
        }
        Assert.assertTrue((boolean)sb.toString().contains("test_metrics"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotPreventCreatingTopicWhenNonexistingTopicIsCached() throws Exception {
        for (int i = 0; i < 100; ++i) {
            String topicName = "persistent://prop/ns-abc/topic-caching-test-topic" + i;
            CountDownLatch latch = new CountDownLatch(1);
            Thread getStatsThread = new Thread(() -> {
                try {
                    latch.countDown();
                    Thread.sleep(1L);
                    this.admin.topics().getStats(topicName);
                    Assert.fail((String)"The topic should not exist yet.");
                }
                catch (PulsarAdminException.NotFoundException notFoundException) {
                }
                catch (InterruptedException | PulsarAdminException e) {
                    log.error("Exception in {}", (Object)Thread.currentThread().getName(), (Object)e);
                }
            }, "getStatsThread#" + i);
            getStatsThread.start();
            latch.await();
            org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
            try {
                Assert.assertNotNull((Object)producer);
                getStatsThread.join();
                continue;
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testIsSystemTopic() {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Assert.assertFalse((boolean)brokerService.isSystemTopic(TopicName.get((String)"test")));
        Assert.assertFalse((boolean)brokerService.isSystemTopic(TopicName.get((String)"public/default/test")));
        Assert.assertFalse((boolean)brokerService.isSystemTopic(TopicName.get((String)"healthcheck")));
        Assert.assertFalse((boolean)brokerService.isSystemTopic(TopicName.get((String)"public/default/healthcheck")));
        Assert.assertFalse((boolean)brokerService.isSystemTopic(TopicName.get((String)"persistent://public/default/test")));
        Assert.assertFalse((boolean)brokerService.isSystemTopic(TopicName.get((String)"non-persistent://public/default/test")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"__change_events")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"__change_events-partition-0")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"__change_events-partition-1")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"__transaction_buffer_snapshot")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"__transaction_buffer_snapshot-partition-0")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"__transaction_buffer_snapshot-partition-1")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"topicxxx-partition-0-multiTopicsReader-f433329d68__transaction_pending_ack")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.get((String)"topicxxx-multiTopicsReader-f433329d68__transaction_pending_ack")));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(TopicName.TRANSACTION_COORDINATOR_LOG));
        NamespaceName heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace((String)this.pulsar.getAdvertisedAddress(), (ServiceConfiguration)this.pulsar.getConfig());
        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2((String)this.pulsar.getAdvertisedAddress(), (ServiceConfiguration)this.pulsar.getConfig());
        Assert.assertTrue((boolean)brokerService.isSystemTopic("persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck"));
        Assert.assertTrue((boolean)brokerService.isSystemTopic(heartbeatNamespaceV2.toString() + "/healthcheck"));
    }
}

