/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.google.common.collect.Sets;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class NamespaceOwnershipListenerTests
extends BrokerTestBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testNamespaceBundleOwnershipListener() throws PulsarAdminException, InterruptedException, PulsarClientException {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean onLoad = new AtomicBoolean(false);
        final AtomicBoolean unLoad = new AtomicBoolean(false);
        final String namespace = "prop/" + UUID.randomUUID().toString();
        this.pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener[]{new NamespaceBundleOwnershipListener(){

            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.getNamespaceObject().toString().equals(namespace);
            }

            public void onLoad(NamespaceBundle bundle) {
                countDownLatch.countDown();
                onLoad.set(true);
            }

            public void unLoad(NamespaceBundle bundle) {
                countDownLatch.countDown();
                unLoad.set(true);
            }
        }});
        this.admin.namespaces().createNamespace(namespace, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Assert.assertTrue((boolean)this.admin.namespaces().getNamespaces("prop").contains(namespace));
        String topic = "persistent://" + namespace + "/os-0";
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        producer.close();
        this.admin.namespaces().unload(namespace);
        countDownLatch.await();
        Assert.assertTrue((boolean)onLoad.get());
        Assert.assertTrue((boolean)unLoad.get());
        this.admin.topics().delete(topic);
        this.admin.namespaces().deleteNamespace(namespace);
    }

    @Test
    public void testGetAllPartitions() throws PulsarAdminException, ExecutionException, InterruptedException {
        String namespace = "prop/" + UUID.randomUUID().toString();
        this.admin.namespaces().createNamespace(namespace, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Assert.assertTrue((boolean)this.admin.namespaces().getNamespaces("prop").contains(namespace));
        String topicName = "persistent://" + namespace + "/os";
        this.admin.topics().createPartitionedTopic(topicName, 6);
        List partitions = (List)this.pulsar.getNamespaceService().getAllPartitions(NamespaceName.get((String)namespace)).get();
        Assert.assertEquals((int)partitions.size(), (int)6);
        for (int i = 0; i < partitions.size(); ++i) {
            Assert.assertEquals((String)((String)partitions.get(i)), (String)(topicName + "-partition-" + i));
        }
        this.admin.topics().deletePartitionedTopic(topicName);
        this.admin.namespaces().deleteNamespace(namespace);
    }

    @Test
    public void testNamespaceBundleLookupOnwershipListener() throws PulsarAdminException, InterruptedException, PulsarClientException {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicInteger onLoad = new AtomicInteger(0);
        final AtomicInteger unLoad = new AtomicInteger(0);
        final String namespace = "prop/" + UUID.randomUUID().toString();
        this.pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener[]{new NamespaceBundleOwnershipListener(){

            public void onLoad(NamespaceBundle bundle) {
                countDownLatch.countDown();
                onLoad.addAndGet(1);
            }

            public void unLoad(NamespaceBundle bundle) {
                countDownLatch.countDown();
                unLoad.addAndGet(1);
            }

            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.getNamespaceObject().toString().equals(namespace);
            }
        }});
        this.admin.namespaces().createNamespace(namespace, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        Assert.assertTrue((boolean)this.admin.namespaces().getNamespaces("prop").contains(namespace));
        String topic = "persistent://" + namespace + "/os-0";
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        producer.close();
        this.admin.lookups().lookupTopic(topic);
        this.admin.namespaces().unload(namespace);
        countDownLatch.await();
        Assert.assertEquals((int)onLoad.get(), (int)1);
        Assert.assertEquals((int)unLoad.get(), (int)1);
        this.admin.topics().delete(topic);
        this.admin.namespaces().deleteNamespace(namespace);
    }
}

