/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class NamespaceServiceTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(NamespaceServiceTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSplitAndOwnBundles() throws Exception {
        OwnershipCache MockOwnershipCache = (OwnershipCache)Mockito.spy((Object)this.pulsar.getNamespaceService().getOwnershipCache());
        ((OwnershipCache)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)MockOwnershipCache)).disableOwnership((NamespaceBundle)ArgumentMatchers.any(NamespaceBundle.class));
        Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
        ownership.setAccessible(true);
        ownership.set(this.pulsar.getNamespaceService(), MockOwnershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)"pulsar/global/ns1");
        TopicName topicName = TopicName.get((String)"persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        NamespaceBundle originalBundle = bundles.findBundle(topicName);
        CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
        try {
            result.get();
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"split bundle failed", (Throwable)e);
        }
        NamespaceBundleFactory bundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
        NamespaceBundles updatedNsBundles = bundleFactory.getBundles(nsname);
        org.testng.Assert.assertNotNull((Object)updatedNsBundles);
        List bundleList = updatedNsBundles.getBundles();
        org.testng.Assert.assertNotNull((Object)bundles);
        NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory((PulsarService)this.pulsar, (HashFunction)Hashing.crc32());
        Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = this.splitBundles(utilityFactory, nsname, bundles, originalBundle);
        org.testng.Assert.assertNotNull(splitBundles);
        HashSet splitBundleSet = new HashSet((Collection)splitBundles.getRight());
        splitBundleSet.removeAll(bundleList);
        org.testng.Assert.assertTrue((boolean)splitBundleSet.isEmpty());
        LocalPolicies policies = (LocalPolicies)this.pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
        NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles);
        org.testng.Assert.assertEquals((Object)localZkBundles, (Object)updatedNsBundles);
        log.info("Policies: {}", (Object)policies);
        bundleList.forEach(b -> {
            try {
                byte[] data = ((GetResult)((Optional)this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path((NamespaceBundle)b)).join()).get()).getValue();
                NamespaceEphemeralData node = (NamespaceEphemeralData)ObjectMapperFactory.getThreadLocal().readValue(data, NamespaceEphemeralData.class);
                org.testng.Assert.assertEquals((String)node.getNativeUrl(), (String)this.pulsar.getBrokerServiceUrl());
            }
            catch (Exception e) {
                org.testng.Assert.fail((String)"failed to setup ownership", (Throwable)e);
            }
        });
    }

    @Test
    public void testSplitMapWithRefreshedStatMap() throws Exception {
        OwnershipCache MockOwnershipCache = (OwnershipCache)Mockito.spy((Object)this.pulsar.getNamespaceService().getOwnershipCache());
        ManagedLedger ledger = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        Mockito.when((Object)ledger.getCursors()).thenReturn((Object)Lists.newArrayList());
        ((OwnershipCache)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)MockOwnershipCache)).disableOwnership((NamespaceBundle)ArgumentMatchers.any(NamespaceBundle.class));
        Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
        ownership.setAccessible(true);
        ownership.set(this.pulsar.getNamespaceService(), MockOwnershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)"pulsar/global/ns1");
        TopicName topicName = TopicName.get((String)"persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        NamespaceBundle originalBundle = bundles.findBundle(topicName);
        PersistentTopic topic = new PersistentTopic(topicName.toString(), ledger, this.pulsar.getBrokerService());
        topic.initialize().join();
        Method method = this.pulsar.getBrokerService().getClass().getDeclaredMethod("addTopicToStatsMaps", TopicName.class, Topic.class);
        method.setAccessible(true);
        method.invoke((Object)this.pulsar.getBrokerService(), topicName, topic);
        String nspace = originalBundle.getNamespaceObject().toString();
        List list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace, originalBundle.toString());
        org.testng.Assert.assertNotNull((Object)list);
        CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
        try {
            result.get();
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"split bundle failed", (Throwable)e);
        }
        list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace, originalBundle.toString());
        org.testng.Assert.assertTrue((boolean)list.isEmpty());
        NamespaceBundle splitBundle = this.pulsar.getNamespaceService().getBundle(topicName);
        org.testng.Assert.assertFalse((boolean)CollectionUtils.isEmpty((Collection)this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace, splitBundle.toString())));
    }

    @Test
    public void testIsServiceUnitDisabled() throws Exception {
        OwnershipCache MockOwnershipCache = (OwnershipCache)Mockito.spy((Object)this.pulsar.getNamespaceService().getOwnershipCache());
        ManagedLedger ledger = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        Mockito.when((Object)ledger.getCursors()).thenReturn((Object)Lists.newArrayList());
        ((OwnershipCache)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)MockOwnershipCache)).disableOwnership((NamespaceBundle)ArgumentMatchers.any(NamespaceBundle.class));
        Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
        ownership.setAccessible(true);
        ownership.set(this.pulsar.getNamespaceService(), MockOwnershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)"pulsar/global/ns1");
        TopicName topicName = TopicName.get((String)"persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        NamespaceBundle originalBundle = bundles.findBundle(topicName);
        org.testng.Assert.assertFalse((boolean)namespaceService.isNamespaceBundleDisabled(originalBundle));
    }

    @Test
    public void testRemoveOwnershipNamespaceBundle() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache)Mockito.spy((Object)this.pulsar.getNamespaceService().getOwnershipCache());
        ManagedLedger ledger = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        Mockito.when((Object)ledger.getCursors()).thenReturn((Object)Lists.newArrayList());
        ((OwnershipCache)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)ownershipCache)).disableOwnership((NamespaceBundle)ArgumentMatchers.any(NamespaceBundle.class));
        Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
        ownership.setAccessible(true);
        ownership.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)"prop/use/ns1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        NamespaceBundle bundle = (NamespaceBundle)bundles.getBundles().get(0);
        ownershipCache.tryAcquiringOwnership(bundle).get();
        org.testng.Assert.assertNotNull((Object)ownershipCache.getOwnedBundle(bundle));
        ownershipCache.removeOwnership(bundles).get();
        org.testng.Assert.assertNull((Object)ownershipCache.getOwnedBundle(bundle));
    }

    @Test
    public void testUnloadNamespaceBundleFailure() throws Exception {
        String topicName = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ConcurrentOpenHashMap topics = this.pulsar.getBrokerService().getTopics();
        Topic spyTopic = (Topic)Mockito.spy(((Optional)((CompletableFuture)topics.get((Object)"persistent://my-property/use/my-ns/my-topic1")).get()).get());
        topics.clear();
        CompletableFuture<Optional<Topic>> topicFuture = CompletableFuture.completedFuture(Optional.of(spyTopic));
        topics.put((Object)"persistent://my-property/use/my-ns/my-topic1", topicFuture);
        ((Topic)Mockito.doAnswer(invocation -> {
            CompletableFuture result = new CompletableFuture();
            result.completeExceptionally(new RuntimeException("first time failed"));
            return result;
        }).when((Object)spyTopic)).close(false);
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"persistent://my-property/use/my-ns/my-topic1"));
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle).join();
        Optional res = (Optional)this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path((NamespaceBundle)bundle)).join();
        org.testng.Assert.assertFalse((boolean)res.isPresent());
    }

    @Test(timeOut=6000L)
    public void testUnloadNamespaceBundleWithStuckTopic() throws Exception {
        String topicName = "persistent://my-property/use/my-ns/my-topic1";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        ConcurrentOpenHashMap topics = this.pulsar.getBrokerService().getTopics();
        Topic spyTopic = (Topic)Mockito.spy(((Optional)((CompletableFuture)topics.get((Object)"persistent://my-property/use/my-ns/my-topic1")).get()).get());
        topics.clear();
        CompletableFuture<Optional<Topic>> topicFuture = CompletableFuture.completedFuture(Optional.of(spyTopic));
        topics.put((Object)"persistent://my-property/use/my-ns/my-topic1", topicFuture);
        ((Topic)Mockito.doAnswer(invocation -> new CompletableFuture()).when((Object)spyTopic)).close(false);
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getBundle(TopicName.get((String)"persistent://my-property/use/my-ns/my-topic1"));
        this.pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 1L, TimeUnit.SECONDS).join();
        Optional res = (Optional)this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path((NamespaceBundle)bundle)).join();
        org.testng.Assert.assertFalse((boolean)res.isPresent());
        consumer.close();
    }

    @Test
    public void testLoadReportDeserialize() throws Exception {
        String candidateBroker1 = "http://localhost:8000";
        String candidateBroker2 = "http://localhost:3000";
        LoadReport lr = new LoadReport(null, null, "http://localhost:8000", null);
        LocalBrokerData ld = new LocalBrokerData(null, null, "http://localhost:3000", null);
        URI uri1 = new URI("http://localhost:8000");
        URI uri2 = new URI("http://localhost:3000");
        String path1 = String.format("%s/%s:%s", "/loadbalance/brokers", uri1.getHost(), uri1.getPort());
        String path2 = String.format("%s/%s:%s", "/loadbalance/brokers", uri2.getHost(), uri2.getPort());
        this.pulsar.getLocalMetadataStore().put(path1, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)lr), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
        this.pulsar.getLocalMetadataStore().put(path2, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)ld), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
        LookupResult result1 = (LookupResult)this.pulsar.getNamespaceService().createLookupResult("http://localhost:8000", false, null).get();
        LoadManager oldLoadManager = (LoadManager)this.pulsar.getLoadManager().getAndSet(new ModularLoadManagerWrapper((ModularLoadManager)new ModularLoadManagerImpl()));
        oldLoadManager.stop();
        LookupResult result2 = (LookupResult)this.pulsar.getNamespaceService().createLookupResult("http://localhost:3000", false, null).get();
        org.testng.Assert.assertEquals((String)result1.getLookupData().getBrokerUrl(), (String)"http://localhost:8000");
        org.testng.Assert.assertEquals((String)result2.getLookupData().getBrokerUrl(), (String)"http://localhost:3000");
        System.out.println(result2);
    }

    @Test
    public void testCreateLookupResult() throws Exception {
        String candidateBroker = "pulsar://localhost:6650";
        String listenerUrl = "pulsar://localhost:7000";
        String listenerUrlTls = "pulsar://localhost:8000";
        String listener = "listenerName";
        HashMap advertisedListeners = Maps.newHashMap();
        advertisedListeners.put("listenerName", AdvertisedListener.builder().brokerServiceUrl(new URI("pulsar://localhost:7000")).brokerServiceUrlTls(new URI("pulsar://localhost:8000")).build());
        LocalBrokerData ld = new LocalBrokerData(null, null, "pulsar://localhost:6650", null, (Map)advertisedListeners);
        URI uri = new URI("pulsar://localhost:6650");
        String path = String.format("%s/%s:%s", "/loadbalance/brokers", uri.getHost(), uri.getPort());
        this.pulsar.getLocalMetadataStore().put(path, ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)ld), Optional.empty(), EnumSet.of(CreateOption.Ephemeral));
        LookupResult noListener = (LookupResult)this.pulsar.getNamespaceService().createLookupResult("pulsar://localhost:6650", false, null).get();
        LookupResult withListener = (LookupResult)this.pulsar.getNamespaceService().createLookupResult("pulsar://localhost:6650", false, "listenerName").get();
        org.testng.Assert.assertEquals((String)noListener.getLookupData().getBrokerUrl(), (String)"pulsar://localhost:6650");
        org.testng.Assert.assertEquals((String)withListener.getLookupData().getBrokerUrl(), (String)"pulsar://localhost:7000");
        org.testng.Assert.assertEquals((String)withListener.getLookupData().getBrokerUrlTls(), (String)"pulsar://localhost:8000");
        System.out.println(withListener);
    }

    @Test
    public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
        OwnershipCache MockOwnershipCache = (OwnershipCache)Mockito.spy((Object)this.pulsar.getNamespaceService().getOwnershipCache());
        ((OwnershipCache)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)MockOwnershipCache)).disableOwnership((NamespaceBundle)ArgumentMatchers.any(NamespaceBundle.class));
        Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
        ownership.setAccessible(true);
        ownership.set(this.pulsar.getNamespaceService(), MockOwnershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)"pulsar/global/ns1");
        TopicName topicName = TopicName.get((String)"persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        NamespaceBundle originalBundle = bundles.findBundle(topicName);
        CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
        try {
            result.get();
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"split bundle failed", (Throwable)e);
        }
        NamespaceBundleFactory bundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
        NamespaceBundles updatedNsBundles = bundleFactory.getBundles(nsname);
        org.testng.Assert.assertNotNull((Object)updatedNsBundles);
        List bundleList = updatedNsBundles.getBundles();
        org.testng.Assert.assertNotNull((Object)bundles);
        NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory((PulsarService)this.pulsar, (HashFunction)Hashing.crc32());
        Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = this.splitBundles(utilityFactory, nsname, bundles, originalBundle);
        org.testng.Assert.assertNotNull(splitBundles);
        HashSet splitBundleSet = new HashSet((Collection)splitBundles.getRight());
        splitBundleSet.removeAll(bundleList);
        org.testng.Assert.assertTrue((boolean)splitBundleSet.isEmpty());
        LocalPolicies policies = (LocalPolicies)this.pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
        NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles);
        org.testng.Assert.assertEquals((Object)localZkBundles, (Object)updatedNsBundles);
        log.info("Policies: {}", (Object)policies);
        bundleList.forEach(b -> {
            try {
                byte[] data = ((GetResult)((Optional)this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path((NamespaceBundle)b)).join()).get()).getValue();
                NamespaceEphemeralData node = (NamespaceEphemeralData)ObjectMapperFactory.getThreadLocal().readValue(data, NamespaceEphemeralData.class);
                org.testng.Assert.assertEquals((String)node.getNativeUrl(), (String)this.pulsar.getBrokerServiceUrl());
            }
            catch (Exception e) {
                org.testng.Assert.fail((String)"failed to setup ownership", (Throwable)e);
            }
        });
    }

    @Test
    public void testRemoveOwnershipAndSplitBundle() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache)Mockito.spy((Object)this.pulsar.getNamespaceService().getOwnershipCache());
        ((OwnershipCache)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)ownershipCache)).disableOwnership((NamespaceBundle)ArgumentMatchers.any(NamespaceBundle.class));
        Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
        ownership.setAccessible(true);
        ownership.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)"pulsar/global/ns1");
        TopicName topicName = TopicName.get((String)"persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        NamespaceBundle originalBundle = bundles.findBundle(topicName);
        CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
        try {
            result1.get();
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"split bundle failed", (Throwable)e);
        }
        NamespaceBundles updatedNsBundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        org.testng.Assert.assertNotNull((Object)updatedNsBundles);
        NamespaceBundle splittedBundle = updatedNsBundles.findBundle(topicName);
        updatedNsBundles.getBundles().stream().filter(bundle -> !bundle.equals((Object)splittedBundle)).forEach(bundle -> {
            try {
                ownershipCache.removeOwnership(bundle).get();
            }
            catch (Exception e) {
                org.testng.Assert.fail((String)"failed to remove ownership", (Throwable)e);
            }
        });
        CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
        try {
            result2.get();
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"split bundle failed", (Throwable)e);
        }
    }

    @Test
    public void testSplitBundleAndRemoveOldBundleFromOwnerShipCache() throws Exception {
        OwnershipCache ownershipCache = (OwnershipCache)Mockito.spy((Object)this.pulsar.getNamespaceService().getOwnershipCache());
        ((OwnershipCache)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)ownershipCache)).disableOwnership((NamespaceBundle)ArgumentMatchers.any(NamespaceBundle.class));
        Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
        ownership.setAccessible(true);
        ownership.set(this.pulsar.getNamespaceService(), ownershipCache);
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)"pulsar/global/ns1");
        TopicName topicName = TopicName.get((String)"persistent://pulsar/global/ns1/topic-1");
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        NamespaceBundle splitBundle1 = bundles.findBundle(topicName);
        ownershipCache.tryAcquiringOwnership(splitBundle1);
        CompletableFuture result1 = namespaceService.splitAndOwnBundle(splitBundle1, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
        try {
            result1.get();
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"split bundle failed", (Throwable)e);
        }
        Awaitility.await().untilAsserted(() -> org.testng.Assert.assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle1)));
        bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        org.testng.Assert.assertNotNull((Object)bundles);
        NamespaceBundle splitBundle2 = bundles.findBundle(topicName);
        CompletableFuture result2 = namespaceService.splitAndOwnBundle(splitBundle2, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
        try {
            result2.get();
        }
        catch (Exception e) {
            org.testng.Assert.fail((String)"split bundle failed", (Throwable)e);
        }
        Awaitility.await().untilAsserted(() -> org.testng.Assert.assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle2)));
    }

    @Test
    public void testSplitLargestBundle() throws Exception {
        String namespace = "prop/test/ns-abc2";
        String topic = "persistent://" + namespace + "/t1-";
        int totalTopics = 100;
        BundlesData bundleData = BundlesData.builder().numBundles(10).build();
        this.admin.namespaces().createNamespace(namespace, bundleData);
        Consumer[] consumers = new Consumer[totalTopics];
        for (int i = 0; i < totalTopics; ++i) {
            consumers[i] = this.pulsarClient.newConsumer().topic(new String[]{topic + i}).subscriptionName("my-subscriber-name").subscribe();
        }
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceName nsname = NamespaceName.get((String)namespace);
        NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
        HashMap topicCount = Maps.newHashMap();
        int maxTopics = 0;
        String maxBundle = null;
        for (int i = 0; i < totalTopics; ++i) {
            String bundle = bundles.findBundle(TopicName.get((String)(topic + i))).getBundleRange();
            int count = topicCount.getOrDefault(bundle, 0) + 1;
            topicCount.put(bundle, count);
            if (count <= maxTopics) continue;
            maxTopics = count;
            maxBundle = bundle;
        }
        String largestBundle = namespaceService.getNamespaceBundleFactory().getBundlesWithHighestTopics(nsname).getBundleRange();
        org.testng.Assert.assertEquals(maxBundle, (String)largestBundle);
        for (int i = 0; i < totalTopics; ++i) {
            consumers[i].close();
        }
        this.admin.namespaces().splitNamespaceBundle(namespace, "LARGEST", false, null);
        for (NamespaceBundle bundle : namespaceService.getNamespaceBundleFactory().getBundles(nsname).getBundles()) {
            Assert.assertNotEquals((Object)bundle.getBundleRange(), (Object)maxBundle);
        }
    }

    @Test
    public void testHeartbeatNamespaceMatch() throws Exception {
        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace((String)this.pulsar.getAdvertisedAddress(), (ServiceConfiguration)this.conf);
        NamespaceBundle namespaceBundle = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
        org.testng.Assert.assertTrue((boolean)NamespaceService.isSystemServiceNamespace((String)NamespaceBundle.getBundleNamespace((String)namespaceBundle.toString())));
    }

    private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
        Field bCacheField = NamespaceBundleFactory.class.getDeclaredField("bundlesCache");
        bCacheField.setAccessible(true);
        ((AsyncLoadingCache)bCacheField.get(utilityFactory)).put((Object)nsname, CompletableFuture.completedFuture(bundles));
        return (Pair)utilityFactory.splitBundles(targetBundle, 2, null).join();
    }
}

