/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.namespace;

import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class OwnershipCacheTest {
    private static final Logger log = LoggerFactory.getLogger(OwnershipCacheTest.class);
    private PulsarService pulsar;
    private ServiceConfiguration config;
    private String selfBrokerUrl;
    private NamespaceBundleFactory bundleFactory;
    private NamespaceService nsService;
    private BrokerService brokerService;
    private OrderedScheduler executor;
    private MetadataStoreExtended store;
    private MetadataStoreExtended otherStore;
    private CoordinationService coordinationService;
    private ZookeeperServerTest zookeeperServer;

    @BeforeMethod
    public void setup() throws Exception {
        int port = 8080;
        this.selfBrokerUrl = "tcp://localhost:8080";
        this.pulsar = (PulsarService)Mockito.mock(PulsarService.class);
        this.config = (ServiceConfiguration)Mockito.mock(ServiceConfiguration.class);
        this.executor = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
        this.zookeeperServer = new ZookeeperServerTest(0);
        this.zookeeperServer.start();
        this.store = MetadataStoreExtended.create((String)this.zookeeperServer.getHostPort(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        this.coordinationService = new CoordinationServiceImpl(this.store);
        this.otherStore = MetadataStoreExtended.create((String)this.zookeeperServer.getHostPort(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        Mockito.when((Object)this.pulsar.getConfigurationMetadataStore()).thenReturn((Object)this.store);
        Mockito.when((Object)this.pulsar.getLocalMetadataStore()).thenReturn((Object)this.store);
        Mockito.when((Object)this.pulsar.getConfigurationMetadataStore()).thenReturn((Object)this.store);
        Mockito.when((Object)this.pulsar.getCoordinationService()).thenReturn((Object)this.coordinationService);
        this.bundleFactory = new NamespaceBundleFactory(this.pulsar, Hashing.crc32());
        this.nsService = (NamespaceService)Mockito.mock(NamespaceService.class);
        this.brokerService = (BrokerService)Mockito.mock(BrokerService.class);
        ((BrokerService)Mockito.doReturn(CompletableFuture.completedFuture(1)).when((Object)this.brokerService)).unloadServiceUnit((NamespaceBundle)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any()));
        ((PulsarService)Mockito.doReturn((Object)this.config).when((Object)this.pulsar)).getConfiguration();
        ((PulsarService)Mockito.doReturn((Object)this.nsService).when((Object)this.pulsar)).getNamespaceService();
        ((ServiceConfiguration)Mockito.doReturn(Optional.of(8080)).when((Object)this.config)).getBrokerServicePort();
        ((ServiceConfiguration)Mockito.doReturn(Optional.empty()).when((Object)this.config)).getWebServicePort();
        ((PulsarService)Mockito.doReturn((Object)this.brokerService).when((Object)this.pulsar)).getBrokerService();
        ((PulsarService)Mockito.doReturn((Object)this.selfBrokerUrl).when((Object)this.pulsar)).getBrokerServiceUrl();
    }

    @AfterMethod(alwaysRun=true)
    public void teardown() throws Exception {
        this.executor.shutdownNow();
        this.store.close();
        this.otherStore.close();
        this.zookeeperServer.close();
    }

    @Test
    public void testConstructor() {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        Assert.assertNotNull((Object)cache);
        Assert.assertNotNull((Object)cache.getOwnedBundles());
    }

    @Test
    public void testDisableOwnership() throws Exception {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        NamespaceBundle testBundle = new NamespaceBundle(NamespaceName.get((String)"pulsar/test/ns-1"), Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
        Assert.assertFalse((boolean)((Optional)cache.getOwnerAsync(testBundle).get()).isPresent());
        NamespaceEphemeralData data1 = (NamespaceEphemeralData)cache.tryAcquiringOwnership(testBundle).get();
        Assert.assertFalse((boolean)data1.isDisabled());
        cache.disableOwnership(testBundle).get();
        data1 = (NamespaceEphemeralData)((Optional)cache.getOwnerAsync(testBundle).get()).get();
        Assert.assertTrue((boolean)data1.isDisabled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetOrSetOwner() throws Exception {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        NamespaceBundle testFullBundle = new NamespaceBundle(NamespaceName.get((String)"pulsar/test/ns-2"), Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
        Assert.assertFalse((boolean)((Optional)cache.getOwnerAsync(testFullBundle).get()).isPresent());
        NamespaceEphemeralData data1 = (NamespaceEphemeralData)cache.tryAcquiringOwnership(testFullBundle).get();
        Assert.assertEquals((String)data1.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data1.isDisabled());
        OwnedBundle nsObj = cache.getOwnedBundle(testFullBundle);
        ((NamespaceService)Mockito.doReturn((Object)cache).when((Object)this.nsService)).getOwnershipCache();
        nsObj.handleUnloadRequest(this.pulsar, 5L, TimeUnit.SECONDS).join();
        MetadataStoreExtended otherStore = MetadataStoreExtended.create((String)this.zookeeperServer.getHostPort(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        try {
            otherStore.put(ServiceUnitUtils.path((NamespaceBundle)testFullBundle), ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
            try {
                cache.tryAcquiringOwnership(testFullBundle).get();
                Assert.fail((String)"Should fail to acquire");
            }
            catch (ExecutionException e) {
                Assert.assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class);
            }
            data1 = (NamespaceEphemeralData)((Optional)cache.getOwnerAsync(testFullBundle).join()).get();
            Assert.assertEquals((String)data1.getNativeUrl(), (String)"pulsar://otherhost:8881");
            Assert.assertEquals((String)data1.getNativeUrlTls(), (String)"pulsar://otherhost:8884");
            Assert.assertFalse((boolean)data1.isDisabled());
        }
        finally {
            if (Collections.singletonList(otherStore).get(0) != null) {
                otherStore.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetOwner() throws Exception {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        NamespaceBundle testBundle = new NamespaceBundle(NamespaceName.get((String)"pulsar/test/ns-3"), Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
        Assert.assertFalse((boolean)((Optional)cache.getOwnerAsync(testBundle).get()).isPresent());
        MetadataStoreExtended otherStore = MetadataStoreExtended.create((String)this.zookeeperServer.getHostPort(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        try {
            otherStore.put(ServiceUnitUtils.path((NamespaceBundle)testBundle), ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
            try {
                cache.tryAcquiringOwnership(testBundle).get();
                Assert.fail((String)"Should fail to acquire");
            }
            catch (ExecutionException e) {
                Assert.assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class);
            }
            NamespaceEphemeralData data1 = (NamespaceEphemeralData)((Optional)cache.getOwnerAsync(testBundle).join()).get();
            Assert.assertEquals((String)data1.getNativeUrl(), (String)"pulsar://otherhost:8881");
            Assert.assertEquals((String)data1.getNativeUrlTls(), (String)"pulsar://otherhost:8884");
            Assert.assertFalse((boolean)data1.isDisabled());
            NamespaceEphemeralData readOnlyData = (NamespaceEphemeralData)((Optional)cache.getOwnerAsync(testBundle).get()).get();
            Assert.assertEquals((Object)data1, (Object)readOnlyData);
            NamespaceBundle noneBundle = new NamespaceBundle(NamespaceName.get((String)"pulsar/test/ns-none"), Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
            Optional res = (Optional)cache.getOwnerAsync(noneBundle).get();
            Assert.assertFalse((boolean)res.isPresent());
        }
        finally {
            if (Collections.singletonList(otherStore).get(0) != null) {
                otherStore.close();
            }
        }
    }

    @Test
    public void testGetOwnedServiceUnit() throws Exception {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        NamespaceName testNs = NamespaceName.get((String)"pulsar/test/ns-5");
        NamespaceBundle testBundle = new NamespaceBundle(testNs, Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
        Assert.assertFalse((boolean)((Optional)cache.getOwnerAsync(testBundle).get()).isPresent());
        try {
            Preconditions.checkNotNull((Object)cache.getOwnedBundle(testBundle));
            Assert.fail((String)"Should have failed");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        this.otherStore.put(ServiceUnitUtils.path((NamespaceBundle)testBundle), ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
        try {
            Preconditions.checkNotNull((Object)cache.getOwnedBundle(testBundle));
            Assert.fail((String)"Should have failed");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            cache.tryAcquiringOwnership(testBundle).get();
            Assert.fail((String)"Should fail to acquire");
        }
        catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class);
        }
        NamespaceEphemeralData data1 = (NamespaceEphemeralData)((Optional)cache.getOwnerAsync(testBundle).join()).get();
        Assert.assertEquals((String)data1.getNativeUrl(), (String)"pulsar://otherhost:8881");
        Assert.assertEquals((String)data1.getNativeUrlTls(), (String)"pulsar://otherhost:8884");
        Assert.assertFalse((boolean)data1.isDisabled());
        try {
            Preconditions.checkNotNull((Object)cache.getOwnedBundle(testBundle));
            Assert.fail((String)"Should have failed");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        this.otherStore.delete(ServiceUnitUtils.path((NamespaceBundle)testBundle), Optional.empty()).join();
        data1 = (NamespaceEphemeralData)cache.tryAcquiringOwnership(testBundle).get();
        Assert.assertEquals((String)data1.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data1.isDisabled());
        Assert.assertNotNull((Object)cache.getOwnedBundle(testBundle));
    }

    @Test
    public void testGetOwnedServiceUnits() throws Exception {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        NamespaceName testNs = NamespaceName.get((String)"pulsar/test/ns-6");
        NamespaceBundle testBundle = new NamespaceBundle(testNs, Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
        Assert.assertFalse((boolean)((Optional)cache.getOwnerAsync(testBundle).get()).isPresent());
        Assert.assertTrue((boolean)cache.getOwnedBundles().isEmpty());
        this.otherStore.put(ServiceUnitUtils.path((NamespaceBundle)testBundle), ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://otherhost:8080", "https://otherhost:4443", false)), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
        Assert.assertTrue((boolean)cache.getOwnedBundles().isEmpty());
        Thread.sleep(500L);
        try {
            cache.tryAcquiringOwnership(testBundle).get();
            Assert.fail((String)"Should fail to acquire");
        }
        catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), MetadataStoreException.LockBusyException.class);
        }
        NamespaceEphemeralData data1 = (NamespaceEphemeralData)((Optional)cache.getOwnerAsync(testBundle).join()).get();
        Assert.assertEquals((String)data1.getNativeUrl(), (String)"pulsar://otherhost:8881");
        Assert.assertEquals((String)data1.getNativeUrlTls(), (String)"pulsar://otherhost:8884");
        Assert.assertFalse((boolean)data1.isDisabled());
        Assert.assertTrue((boolean)cache.getOwnedBundles().isEmpty());
        this.otherStore.delete(ServiceUnitUtils.path((NamespaceBundle)testBundle), Optional.empty()).join();
        data1 = (NamespaceEphemeralData)cache.tryAcquiringOwnership(testBundle).get();
        Assert.assertEquals((String)data1.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data1.isDisabled());
        Assert.assertEquals((int)cache.getOwnedBundles().size(), (int)1);
    }

    @Test
    public void testRemoveOwnership() throws Exception {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        NamespaceName testNs = NamespaceName.get((String)"pulsar/test/ns-7");
        NamespaceBundle bundle = new NamespaceBundle(testNs, Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
        Assert.assertFalse((boolean)((Optional)cache.getOwnerAsync(bundle).get()).isPresent());
        cache.removeOwnership(bundle).get();
        Assert.assertTrue((boolean)cache.getOwnedBundles().isEmpty());
        NamespaceEphemeralData data1 = (NamespaceEphemeralData)cache.tryAcquiringOwnership(bundle).get();
        Assert.assertEquals((String)data1.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data1.isDisabled());
        Assert.assertEquals((int)cache.getOwnedBundles().size(), (int)1);
        cache.removeOwnership(bundle);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue((boolean)cache.getOwnedBundles().isEmpty());
            Assert.assertFalse((boolean)((Boolean)this.store.exists(ServiceUnitUtils.path((NamespaceBundle)bundle)).join()));
        });
    }

    @Test
    public void testReestablishOwnership() throws Exception {
        OwnershipCache cache = new OwnershipCache(this.pulsar, this.bundleFactory, this.nsService);
        NamespaceBundle testFullBundle = new NamespaceBundle(NamespaceName.get((String)"pulsar/test/ns-8"), Range.closedOpen((Comparable)Long.valueOf(0L), (Comparable)Long.valueOf(Integer.MAX_VALUE)), this.bundleFactory);
        String testFullBundlePath = ServiceUnitUtils.path((NamespaceBundle)testFullBundle);
        Assert.assertFalse((boolean)((Optional)cache.getOwnerAsync(testFullBundle).get()).isPresent());
        Assert.assertNull((Object)cache.getOwnedBundle(testFullBundle));
        NamespaceEphemeralData data1 = (NamespaceEphemeralData)cache.tryAcquiringOwnership(testFullBundle).get();
        Assert.assertEquals((String)data1.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data1.isDisabled());
        Assert.assertNotNull((Object)cache.getOwnedBundle(testFullBundle));
        NamespaceEphemeralData data2 = (NamespaceEphemeralData)((Optional)cache.getOwnerAsync(testFullBundle).get()).get();
        Assert.assertEquals((String)data2.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data2.isDisabled());
        Assert.assertNotNull((Object)cache.getOwnedBundle(testFullBundle));
        cache.invalidateLocalOwnerCache();
        Assert.assertNull((Object)cache.getOwnedBundle(testFullBundle));
        NamespaceEphemeralData data3 = (NamespaceEphemeralData)cache.tryAcquiringOwnership(testFullBundle).get();
        Assert.assertEquals((String)data3.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data3.isDisabled());
        Assert.assertNotNull((Object)cache.getOwnedBundle(testFullBundle));
        Assert.assertTrue((boolean)((Boolean)cache.checkOwnershipAsync(testFullBundle).get()));
        Assert.assertEquals((String)data2.getNativeUrl(), (String)this.selfBrokerUrl);
        Assert.assertFalse((boolean)data2.isDisabled());
        Assert.assertNotNull((Object)cache.getOwnedBundle(testFullBundle));
    }
}

