/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletContext;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.MockServletContext;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.zookeeper.KeeperException;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PersistentTopicsTest
extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicsTest.class);
    private PersistentTopics persistentTopics;
    private final String testTenant = "my-tenant";
    private final String testLocalCluster = "use";
    private final String testNamespace = "my-namespace";
    protected Field uriField;
    protected UriInfo uriInfo;
    private NonPersistentTopics nonPersistentTopic;
    private NamespaceResources namespaceResources;

    @BeforeClass
    public void initPersistentTopics() throws Exception {
        this.uriField = PulsarWebResource.class.getDeclaredField("uri");
        this.uriField.setAccessible(true);
        this.uriInfo = (UriInfo)Mockito.mock(UriInfo.class);
    }

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.persistentTopics = (PersistentTopics)Mockito.spy(PersistentTopics.class);
        this.persistentTopics.setServletContext((ServletContext)new MockServletContext());
        this.persistentTopics.setPulsar(this.pulsar);
        ((PersistentTopics)Mockito.doReturn((Object)false).when((Object)this.persistentTopics)).isRequestHttps();
        ((PersistentTopics)Mockito.doReturn(null).when((Object)this.persistentTopics)).originalPrincipal();
        ((PersistentTopics)Mockito.doReturn((Object)"test").when((Object)this.persistentTopics)).clientAppId();
        ((PersistentTopics)Mockito.doReturn((Object)TopicDomain.persistent.value()).when((Object)this.persistentTopics)).domain();
        ((PersistentTopics)Mockito.doNothing().when((Object)this.persistentTopics)).validateAdminAccessForTenant(this.testTenant);
        ((PersistentTopics)Mockito.doReturn((Object)Mockito.mock(AuthenticationDataHttps.class)).when((Object)this.persistentTopics)).clientAuthData();
        this.nonPersistentTopic = (NonPersistentTopics)Mockito.spy(NonPersistentTopics.class);
        this.nonPersistentTopic.setServletContext((ServletContext)new MockServletContext());
        this.nonPersistentTopic.setPulsar(this.pulsar);
        this.namespaceResources = (NamespaceResources)Mockito.mock(NamespaceResources.class);
        ((NonPersistentTopics)Mockito.doReturn((Object)false).when((Object)this.nonPersistentTopic)).isRequestHttps();
        ((NonPersistentTopics)Mockito.doReturn(null).when((Object)this.nonPersistentTopic)).originalPrincipal();
        ((NonPersistentTopics)Mockito.doReturn((Object)"test").when((Object)this.nonPersistentTopic)).clientAppId();
        ((NonPersistentTopics)Mockito.doReturn((Object)TopicDomain.non_persistent.value()).when((Object)this.nonPersistentTopic)).domain();
        ((NonPersistentTopics)Mockito.doNothing().when((Object)this.nonPersistentTopic)).validateAdminAccessForTenant(this.testTenant);
        ((NonPersistentTopics)Mockito.doReturn((Object)Mockito.mock(AuthenticationDataHttps.class)).when((Object)this.nonPersistentTopic)).clientAuthData();
        PulsarResources resources = (PulsarResources)Mockito.spy((Object)new PulsarResources((MetadataStore)this.pulsar.getLocalMetadataStore(), this.pulsar.getConfigurationMetadataStore()));
        ((PulsarResources)Mockito.doReturn((Object)BrokerTestUtil.spyWithClassAndConstructorArgs(TopicResources.class, this.pulsar.getLocalMetadataStore())).when((Object)resources)).getTopicResources();
        Whitebox.setInternalState((Object)this.pulsar, (String)"pulsarResources", (Object)resources);
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
        this.admin.tenants().createTenant(this.testTenant, (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"use", "test"})));
        this.admin.namespaces().createNamespace("my-tenant/my-namespace", (Set)Sets.newHashSet((Object[])new String[]{"use", "test"}));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testGetSubscriptions() {
        String testLocalTopicName = "topic-not-found";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName, true);
        ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errorCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        Assert.assertEquals((String)((RestException)((Object)errorCaptor.getValue())).getMessage(), (String)"Topic not found");
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-0", true);
        errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errorCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        Assert.assertEquals((String)((RestException)((Object)errorCaptor.getValue())).getMessage(), (String)"Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions");
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, 3, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-0", true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Object)Lists.newArrayList((Object[])new String[]{"test"}));
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deleteSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "test", false, true);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-0", true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Object)Lists.newArrayList());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-1", "test", true, (MessageIdImpl)MessageId.earliest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-1", true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Object)Lists.newArrayList((Object[])new String[]{"test"}));
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName + "-partition-0", true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Object)Lists.newArrayList());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", testLocalTopicName, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Object)Lists.newArrayList((Object[])new String[]{"test"}));
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test
    public void testCreateSubscriptions() throws Exception {
        int numberOfMessages = 5;
        String SUB_EARLIEST = "sub-earliest";
        String SUB_LATEST = "sub-latest";
        String SUB_NONE_MESSAGE_ID = "sub-none-message-id";
        String testLocalTopicName = "subWithPositionOrNot";
        String topicName = "persistent://my-tenant/my-namespace/" + testLocalTopicName;
        this.admin.topics().createNonPartitionedTopic(topicName);
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic(topicName).maxPendingMessages(30000).create();
        for (int i = 0; i < 5; ++i) {
            System.out.println(producer.send((Object)new byte[10]));
        }
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "sub-earliest", true, (MessageIdImpl)MessageId.earliest, false);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        TopicStats topicStats = this.persistentTopics.getStats("my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        long msgBacklog = ((SubscriptionStats)topicStats.getSubscriptions().get("sub-earliest")).getMsgBacklog();
        System.out.println("Message back log for sub-earliest is :" + msgBacklog);
        Assert.assertEquals((long)msgBacklog, (long)5L);
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "sub-latest", true, (MessageIdImpl)MessageId.latest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        topicStats = this.persistentTopics.getStats("my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        msgBacklog = ((SubscriptionStats)topicStats.getSubscriptions().get("sub-latest")).getMsgBacklog();
        System.out.println("Message back log for sub-latest is :" + msgBacklog);
        Assert.assertEquals((long)msgBacklog, (long)0L);
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "sub-none-message-id", true, null, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        topicStats = this.persistentTopics.getStats("my-tenant", "my-namespace", testLocalTopicName, true, true, false);
        msgBacklog = ((SubscriptionStats)topicStats.getSubscriptions().get("sub-none-message-id")).getMsgBacklog();
        System.out.println("Message back log for sub-none-message-id is :" + msgBacklog);
        Assert.assertEquals((long)msgBacklog, (long)0L);
        producer.close();
    }

    @Test
    public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException {
        ((PersistentTopics)Mockito.doReturn((Object)TopicDomain.non_persistent.value()).when((Object)this.persistentTopics)).domain();
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", "testCreateSubscriptionForNonPersistentTopic", "sub", true, (MessageIdImpl)MessageId.earliest, false);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)responseCaptor.capture());
        Assert.assertEquals((int)((WebApplicationException)((Object)responseCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.BAD_REQUEST.getStatusCode());
    }

    @Test
    public void testTerminatePartitionedTopic() {
        String testLocalTopicName = "topic-not-found";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, 1, true);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.terminatePartitionedTopic(response, "my-tenant", "my-namespace", testLocalTopicName, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(Arrays.asList(new MessageIdImpl(3L, -1L, -1)));
    }

    @Test
    public void testTerminate() {
        String testLocalTopicName = "topic-not-found";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", testLocalTopicName, true);
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        MessageId messageId = this.persistentTopics.terminate("my-tenant", "my-namespace", testLocalTopicName, true);
        Assert.assertEquals((Object)messageId, (Object)new MessageIdImpl(3L, -1L, -1));
        String nonPersistentTopicName = "non-persistent-topic";
        try {
            this.nonPersistentTopic.terminate("my-tenant", "my-namespace", nonPersistentTopicName, true);
            Assert.fail((String)"Should fail validation on non-persistent topic");
        }
        catch (RestException e) {
            Assert.assertEquals((int)Response.Status.NOT_ACCEPTABLE.getStatusCode(), (int)e.getResponse().getStatus());
        }
    }

    @Test
    public void testNonPartitionedTopics() {
        String nonPartitionTopic = "non-partitioned-topic";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", "non-partitioned-topic", "test", true, (MessageIdImpl)MessageId.latest, false);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.getSubscriptions(response, "my-tenant", "my-namespace", "non-partitioned-topic-partition-0", true);
        ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertTrue((boolean)((RestException)((Object)errorCaptor.getValue())).getMessage().contains("zero partitions"));
        String nonPartitionTopic2 = "secondary-non-partitioned-topic";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "secondary-non-partitioned-topic", true);
        Assert.assertEquals((int)this.persistentTopics.getPartitionedMetadata((String)"my-tenant", (String)"my-namespace", (String)"non-partitioned-topic", (boolean)true, (boolean)false).partitions, (int)0);
        Assert.assertEquals((int)this.persistentTopics.getPartitionedMetadata((String)"my-tenant", (String)"my-namespace", (String)"non-partitioned-topic", (boolean)true, (boolean)true).partitions, (int)0);
    }

    @Test
    public void testCreateNonPartitionedTopic() {
        String topicName = "standard-topic-partition-a";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-partition-a", true);
        PartitionedTopicMetadata pMetadata = this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "standard-topic-partition-a", true, false);
        Assert.assertEquals((int)pMetadata.partitions, (int)0);
        PartitionedTopicMetadata metadata = this.persistentTopics.getPartitionedMetadata("my-tenant", "my-namespace", "standard-topic-partition-a", true, true);
        Assert.assertEquals((int)metadata.partitions, (int)0);
    }

    @Test
    public void testCreateTopicWithReplicationCluster() {
        String topicName = "test-topic-ownership";
        NamespaceName namespaceName = NamespaceName.get((String)"my-tenant", (String)"my-namespace");
        CompletableFuture<Optional<Policies>> policyFuture = new CompletableFuture<Optional<Policies>>();
        Policies policies = new Policies();
        policyFuture.complete(Optional.of(policies));
        Mockito.when((Object)this.pulsar.getPulsarResources().getNamespaceResources()).thenReturn((Object)this.namespaceResources);
        ((NamespaceResources)Mockito.doReturn(policyFuture).when((Object)this.namespaceResources)).getPoliciesAsync(namespaceName);
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-topic-ownership", 2, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.PRECONDITION_FAILED.getStatusCode());
        Assert.assertTrue((boolean)((RestException)((Object)errCaptor.getValue())).getMessage().contains("Namespace does not have any clusters configured"));
        CompletableFuture policyFuture2 = new CompletableFuture();
        policyFuture2.complete(Optional.empty());
        ((NamespaceResources)Mockito.doReturn(policyFuture2).when((Object)this.namespaceResources)).getPoliciesAsync(namespaceName);
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        errCaptor = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-topic-ownership", 2, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        Assert.assertTrue((boolean)((RestException)((Object)errCaptor.getValue())).getMessage().contains("Namespace not found"));
    }

    @Test(expectedExceptions={RestException.class})
    public void testCreateNonPartitionedTopicWithInvalidName() {
        String topicName = "standard-topic-partition-10";
        ((PersistentTopics)Mockito.doAnswer(invocation -> {
            TopicName partitionedTopicname = (TopicName)invocation.getArgument(0, TopicName.class);
            assert (partitionedTopicname.getLocalName().equals("standard-topic"));
            return new PartitionedTopicMetadata(10);
        }).when((Object)this.persistentTopics)).getPartitionedTopicMetadata((TopicName)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-partition-10", true);
    }

    @Test
    public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
        String nonPartitionTopicName1 = "standard-topic";
        String nonPartitionTopicName2 = "special-topic-partition-123";
        String partitionedTopicName = "special-topic";
        Mockito.when((Object)this.pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(NamespaceName.get((String)"my-tenant/my-namespace"))).thenReturn(CompletableFuture.completedFuture(Lists.newArrayList((Object[])new String[]{"persistent://my-tenant/my-namespace/standard-topic", "persistent://my-tenant/my-namespace/special-topic-partition-123"})));
        ((PersistentTopics)Mockito.doReturn((Object)new Policies()).when((Object)this.persistentTopics)).getNamespacePolicies((NamespaceName)ArgumentMatchers.any());
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "special-topic", 5, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.CONFLICT.getStatusCode());
    }

    @Test(expectedExceptions={RestException.class})
    public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
        String nonPartitionTopicName2 = "special-topic-partition-10";
        String partitionedTopicName = "special-topic";
        this.pulsar.getBrokerService().getManagedLedgerFactory().open(TopicName.get((String)"special-topic-partition-10").getPersistenceNamingEncoding());
        ((PersistentTopics)Mockito.doAnswer(invocation -> {
            this.persistentTopics.namespaceName = NamespaceName.get((String)"tenant", (String)"namespace");
            this.persistentTopics.topicName = TopicName.get((String)"persistent", (String)"tenant", (String)"cluster", (String)"namespace", (String)"topicname");
            return null;
        }).when((Object)this.persistentTopics)).validatePartitionedTopicName((String)ArgumentMatchers.any(), (String)ArgumentMatchers.any(), (String)ArgumentMatchers.any());
        ((PersistentTopics)Mockito.doNothing().when((Object)this.persistentTopics)).validateAdminAccessForTenant(ArgumentMatchers.anyString());
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "special-topic", 5, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        this.persistentTopics.updatePartitionedTopic("my-tenant", "my-namespace", "special-topic", false, false, false, 10);
    }

    @Test(timeOut=10000L)
    public void testUnloadTopic() {
        String topicName = "standard-topic-to-be-unload";
        String partitionTopicName = "partition-topic-to-be-unload";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "topic-not-exist", true);
        ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)45000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "standard-topic-to-be-unload", true);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "standard-topic-to-be-unload", true);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "partition-topic-to-be-unload", 6, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "partition-topic-to-be-unload", true);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(response, "my-tenant", "my-namespace", "partition-topic-to-be-unload", true, true, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test(timeOut=10000L)
    public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.unloadTopic(response, "my-tenant", "my-namespace", "non-existent-topic", true);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)45000L).times(1))).resume((Throwable)responseCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)responseCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
    }

    @Test
    public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-topic1", 3, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.nonPersistentTopic.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-topic2", 3, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        List persistentPartitionedTopics = this.persistentTopics.getPartitionedTopicList("my-tenant", "my-namespace");
        Assert.assertEquals((int)persistentPartitionedTopics.size(), (int)1);
        Assert.assertEquals((String)TopicName.get((String)((String)persistentPartitionedTopics.get(0))).getDomain().value(), (String)TopicDomain.persistent.value());
        List nonPersistentPartitionedTopics = this.nonPersistentTopic.getPartitionedTopicList("my-tenant", "my-namespace");
        Assert.assertEquals((int)nonPersistentPartitionedTopics.size(), (int)1);
        Assert.assertEquals((String)TopicName.get((String)((String)nonPersistentPartitionedTopics.get(0))).getDomain().value(), (String)TopicDomain.non_persistent.value());
    }

    @Test
    public void testGrantNonPartitionedTopic() {
        String topicName = "non-partitioned-topic";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "non-partitioned-topic", true);
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", role, expectActions);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), expectActions);
    }

    @Test
    public void testCreateExistedPartition() {
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        String topicName = "test-create-existed-partition";
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-create-existed-partition", 3, true);
        String partitionName = TopicName.get((String)"test-create-existed-partition").getPartition(0).getLocalName();
        try {
            this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", partitionName, false);
            Assert.fail();
        }
        catch (RestException e) {
            log.error("Failed to create {}: {}", (Object)partitionName, (Object)e.getMessage());
            Assert.assertEquals((int)e.getResponse().getStatus(), (int)409);
            Assert.assertEquals((String)e.getMessage(), (String)"This topic already exists");
        }
    }

    @Test
    public void testGrantPartitionedTopic() {
        String partitionedTopicName = "partitioned-topic";
        int numPartitions = 5;
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "partitioned-topic", 5, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", role, expectActions);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), expectActions);
        TopicName topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)"my-tenant", (String)"my-namespace", (String)"partitioned-topic");
        for (int i = 0; i < 5; ++i) {
            TopicName partition = topicName.getPartition(i);
            Map partitionPermissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", partition.getEncodedLocalName());
            Assert.assertEquals((Set)((Set)partitionPermissions.get(role)), expectActions);
        }
    }

    @Test
    public void testRevokeNonPartitionedTopic() {
        String topicName = "non-partitioned-topic";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "non-partitioned-topic", true);
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", role, expectActions);
        this.persistentTopics.revokePermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic", role);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "non-partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), null);
    }

    @Test
    public void testRevokePartitionedTopic() {
        String partitionedTopicName = "partitioned-topic";
        int numPartitions = 5;
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "partitioned-topic", 5, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        String role = "role";
        HashSet<AuthAction> expectActions = new HashSet<AuthAction>();
        expectActions.add(AuthAction.produce);
        this.persistentTopics.grantPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", role, expectActions);
        this.persistentTopics.revokePermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic", role);
        Map permissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", "partitioned-topic");
        Assert.assertEquals((Set)((Set)permissions.get(role)), null);
        TopicName topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)"my-tenant", (String)"my-namespace", (String)"partitioned-topic");
        for (int i = 0; i < 5; ++i) {
            TopicName partition = topicName.getPartition(i);
            Map partitionPermissions = this.persistentTopics.getPermissionsOnTopic("my-tenant", "my-namespace", partition.getEncodedLocalName());
            Assert.assertEquals((Set)((Set)partitionPermissions.get(role)), null);
        }
    }

    @Test
    public void testTriggerCompactionTopic() {
        String partitionTopicName = "test-part";
        String nonPartitionTopicName = "test-non-part";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.compact(response, "my-tenant", "my-namespace", "non-existing-topic", true);
        ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume((Throwable)errCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "test-non-part", true);
        this.persistentTopics.compact(response, "my-tenant", "my-namespace", "test-non-part", true);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "test-part", 2, true);
        this.persistentTopics.compact(response, "my-tenant", "my-namespace", "test-part", true);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)5000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test
    public void testPeekWithSubscriptionNameNotExist() throws Exception {
        String topicName = "testTopic";
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"my-tenant", (String)"my-namespace", (String)"testTopic").toString();
        String subscriptionName = "sub";
        ((TopicsImpl)this.admin.topics()).createPartitionedTopicAsync(topic, 3, true).get();
        String partitionedTopic = topic + "-partition-0";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        for (int i = 0; i < 100; ++i) {
            producer.send((Object)("test" + i));
        }
        List messages = this.admin.topics().peekMessages(partitionedTopic, "sub", 5);
        Assert.assertEquals((int)messages.size(), (int)5);
        producer.close();
    }

    @Test
    public void testGetBacklogSizeByMessageId() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("prop-xyz", (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("prop-xyz/ns1", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://prop-xyz/ns1/testGetBacklogSize";
        this.admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/testGetBacklogSize", 1);
        Producer batchProducer = this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/testGetBacklogSize").enableBatching(false).create();
        CompletableFuture completableFuture = new CompletableFuture();
        for (int i = 0; i < 10; ++i) {
            completableFuture = batchProducer.sendAsync((Object)"a".getBytes());
        }
        completableFuture.get();
        Assert.assertEquals(Optional.ofNullable(this.admin.topics().getBacklogSizeByMessageId("persistent://prop-xyz/ns1/testGetBacklogSize-partition-0", MessageId.earliest)), Optional.of(320L));
    }

    @Test
    public void testGetLastMessageId() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("prop-xyz", (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("prop-xyz/ns1", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://prop-xyz/ns1/testGetLastMessageId";
        this.admin.topics().createNonPartitionedTopic("persistent://prop-xyz/ns1/testGetLastMessageId");
        Producer batchProducer = this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/testGetLastMessageId").enableBatching(true).batchingMaxMessages(100).batchingMaxPublishDelay(2L, TimeUnit.SECONDS).create();
        this.admin.topics().createSubscription("persistent://prop-xyz/ns1/testGetLastMessageId", "test", MessageId.earliest);
        CompletableFuture completableFuture = new CompletableFuture();
        for (int i = 0; i < 10; ++i) {
            completableFuture = batchProducer.sendAsync((Object)"test".getBytes());
        }
        completableFuture.get();
        Assert.assertEquals((int)((BatchMessageIdImpl)this.admin.topics().getLastMessageId("persistent://prop-xyz/ns1/testGetLastMessageId")).getBatchIndex(), (int)9);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/testGetLastMessageId").enableBatching(false).create();
        producer.send((Object)"test".getBytes());
        Assert.assertTrue((boolean)(this.admin.topics().getLastMessageId("persistent://prop-xyz/ns1/testGetLastMessageId") instanceof MessageIdImpl));
    }

    @Test
    public void testExamineMessage() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("tenant-xyz", (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://tenant-xyz/ns-abc/topic-123";
        this.admin.topics().createPartitionedTopic("persistent://tenant-xyz/ns-abc/topic-123", 2);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://tenant-xyz/ns-abc/topic-123-partition-0").create();
        try {
            this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123", "earliest", 1L);
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((String)e.getMessage(), (String)"Examine messages on a partitioned topic is not allowed, please try examine message on specific topic partition");
        }
        producer.send((Object)"message1");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 1L).getData()), (String)"message1");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 1L).getData()), (String)"message1");
        producer.send((Object)"message2");
        producer.send((Object)"message3");
        producer.send((Object)"message4");
        producer.send((Object)"message5");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 1L).getData()), (String)"message1");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 2L).getData()), (String)"message2");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 3L).getData()), (String)"message3");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 4L).getData()), (String)"message4");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "earliest", 5L).getData()), (String)"message5");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 1L).getData()), (String)"message5");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 2L).getData()), (String)"message4");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 3L).getData()), (String)"message3");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 4L).getData()), (String)"message2");
        Assert.assertEquals((String)new String(this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-123-partition-0", "latest", 5L).getData()), (String)"message1");
    }

    @Test
    public void testExamineMessageMetadata() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("tenant-xyz", (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata";
        this.admin.topics().createPartitionedTopic("persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata", 2);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).producerName("testExamineMessageMetadataProducer").compressionType(CompressionType.LZ4).topic("persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata-partition-0").create();
        producer.newMessage().keyBytes("partition123".getBytes()).orderingKey(new byte[]{0}).replicationClusters((List)Lists.newArrayList((Object[])new String[]{"a", "b"})).sequenceId(112233L).value((Object)"data").send();
        MessageImpl message = (MessageImpl)this.admin.topics().examineMessage("persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata-partition-0", "earliest", 1L);
        Assert.assertEquals((long)112233L, (long)message.getSequenceId());
        Assert.assertEquals((byte[])new byte[]{0}, (byte[])message.getOrderingKey());
        Assert.assertEquals((byte[])"partition123".getBytes(), (byte[])message.getKeyBytes());
        Assert.assertTrue((boolean)message.hasBase64EncodedKey());
        Assert.assertEquals((Collection)Lists.newArrayList((Object[])new String[]{"a", "b"}), (Collection)message.getReplicateTo());
        Assert.assertEquals((String)producer.getProducerName(), (String)message.getProducerName());
        Assert.assertEquals((int)CompressionType.LZ4.ordinal(), (int)message.getMessageBuilder().getCompression().ordinal());
        Assert.assertEquals((String)"data", (String)new String(message.getData()));
    }

    @Test
    public void testOffloadWithNullMessageId() {
        String topicName = "topic-123";
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "topic-123", true);
        try {
            this.persistentTopics.triggerOffload("my-tenant", "my-namespace", "topic-123", true, null);
            Assert.fail((String)"should have failed");
        }
        catch (RestException e) {
            Assert.assertEquals((int)e.getResponse().getStatus(), (int)Response.Status.BAD_REQUEST.getStatusCode());
        }
    }

    @Test
    public void testSetReplicatedSubscriptionStatus() {
        String topicName = "topic-with-repl-sub";
        String partitionName = "topic-with-repl-sub-partition-0";
        String subName = "sub";
        AsyncResponse response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(response, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, true);
        ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errorCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(response, "my-tenant", "my-namespace", "topic-with-repl-sub-partition-0", "sub", true, true);
        errorCaptor = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume((Throwable)errorCaptor.capture());
        Assert.assertEquals((int)((RestException)((Object)errorCaptor.getValue())).getResponse().getStatus(), (int)Response.Status.NOT_FOUND.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class);
        this.persistentTopics.createPartitionedTopic(response, "my-tenant", "my-namespace", "topic-with-repl-sub", 2, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.createSubscription(response, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, (MessageIdImpl)MessageId.latest, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(response, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(response, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", true, false);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(response, "my-tenant", "my-namespace", "topic-with-repl-sub-partition-0", "sub", true, true);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.setReplicatedSubscriptionStatus(response, "my-tenant", "my-namespace", "topic-with-repl-sub-partition-0", "sub", true, false);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deleteSubscription(response, "my-tenant", "my-namespace", "topic-with-repl-sub", "sub", false, true);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
        response = (AsyncResponse)Mockito.mock(AsyncResponse.class);
        this.persistentTopics.deletePartitionedTopic(response, "my-tenant", "my-namespace", "topic-with-repl-sub", true, true, false);
        responseCaptor = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse)Mockito.verify((Object)response, (VerificationMode)Mockito.timeout((long)10000L).times(1))).resume(responseCaptor.capture());
        Assert.assertEquals((int)((Response)responseCaptor.getValue()).getStatus(), (int)Response.Status.NO_CONTENT.getStatusCode());
    }

    @Test
    public void testGetMessageById() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("tenant-xyz", (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
        String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
        this.admin.topics().createNonPartitionedTopic("persistent://tenant-xyz/ns-abc/testGetMessageById1");
        this.admin.topics().createNonPartitionedTopic("persistent://tenant-xyz/ns-abc/testGetMessageById2");
        ProducerBase producer1 = (ProducerBase)this.pulsarClient.newProducer().topic("persistent://tenant-xyz/ns-abc/testGetMessageById1").enableBatching(false).create();
        String data1 = "test1";
        MessageIdImpl id1 = (MessageIdImpl)producer1.send((Object)data1.getBytes());
        ProducerBase producer2 = (ProducerBase)this.pulsarClient.newProducer().topic("persistent://tenant-xyz/ns-abc/testGetMessageById2").enableBatching(false).create();
        String data2 = "test2";
        MessageIdImpl id2 = (MessageIdImpl)producer2.send((Object)data2.getBytes());
        Message message1 = this.admin.topics().getMessageById("persistent://tenant-xyz/ns-abc/testGetMessageById1", id1.getLedgerId(), id1.getEntryId());
        Assert.assertEquals((byte[])message1.getData(), (byte[])data1.getBytes());
        Message message2 = this.admin.topics().getMessageById("persistent://tenant-xyz/ns-abc/testGetMessageById2", id2.getLedgerId(), id2.getEntryId());
        Assert.assertEquals((byte[])message2.getData(), (byte[])data2.getBytes());
        Message message3 = null;
        try {
            message3 = this.admin.topics().getMessageById("persistent://tenant-xyz/ns-abc/testGetMessageById2", id1.getLedgerId(), id1.getEntryId());
            Assert.fail();
        }
        catch (Exception e) {
            Assert.assertNull(message3);
        }
        Message message4 = null;
        try {
            message4 = this.admin.topics().getMessageById("persistent://tenant-xyz/ns-abc/testGetMessageById1", id2.getLedgerId(), id2.getEntryId());
            Assert.fail();
        }
        catch (Exception e) {
            Assert.assertNull(message4);
        }
    }

    @Test
    public void testGetMessageIdByTimestamp() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("tenant-xyz", (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp";
        this.admin.topics().createNonPartitionedTopic("persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp");
        final AtomicLong publishTime = new AtomicLong(0L);
        ProducerBase producer = (ProducerBase)this.pulsarClient.newProducer().topic("persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp").enableBatching(false).intercept(new ProducerInterceptor[]{new ProducerInterceptor(){

            public void close() {
            }

            public boolean eligible(Message message) {
                return true;
            }

            public Message beforeSend(Producer producer, Message message) {
                return message;
            }

            public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
                publishTime.set(message.getPublishTime());
            }
        }}).create();
        MessageId id1 = producer.send((Object)"test1".getBytes());
        long publish1 = publishTime.get();
        Thread.sleep(10L);
        MessageId id2 = producer.send((Object)"test2".getBytes());
        long publish2 = publishTime.get();
        Assert.assertTrue((publish1 < publish2 ? 1 : 0) != 0);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp", publish1 - 1L), (Object)id1);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp", publish1), (Object)id1);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp", publish1 + 1L), (Object)id2);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp", publish2), (Object)id2);
        Assert.assertTrue((this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp", publish2 + 1L).compareTo((Object)id2) > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testGetBatchMessageIdByTimestamp() throws Exception {
        TenantInfoImpl tenantInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("tenant-xyz", (TenantInfo)tenantInfo);
        this.admin.namespaces().createNamespace("tenant-xyz/ns-abc", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        String topicName = "persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp";
        this.admin.topics().createNonPartitionedTopic("persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp");
        final ConcurrentHashMap publishTimeMap = new ConcurrentHashMap();
        ProducerBase producer = (ProducerBase)this.pulsarClient.newProducer().topic("persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp").enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MINUTES).batchingMaxMessages(2).intercept(new ProducerInterceptor[]{new ProducerInterceptor(){

            public void close() {
            }

            public boolean eligible(Message message) {
                return true;
            }

            public Message beforeSend(Producer producer, Message message) {
                return message;
            }

            public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
                log.info("onSendAcknowledgement, message={}, msgId={},publish_time={},exception={}", new Object[]{message, msgId, message.getPublishTime(), exception});
                publishTimeMap.put(msgId, message.getPublishTime());
            }
        }}).create();
        ArrayList<CompletableFuture> idFutureList = new ArrayList<CompletableFuture>();
        for (int i = 0; i < 4; ++i) {
            idFutureList.add(producer.sendAsync((Object)new byte[]{(byte)i}));
            Thread.sleep(5L);
        }
        ArrayList<MessageIdImpl> ids = new ArrayList<MessageIdImpl>();
        for (CompletableFuture future : idFutureList) {
            MessageId id = (MessageId)future.get();
            ids.add((MessageIdImpl)id);
        }
        for (MessageIdImpl messageId : ids) {
            Assert.assertTrue((boolean)publishTimeMap.containsKey(messageId));
            log.info("MessageId={},PublishTime={}", (Object)messageId, publishTimeMap.get(messageId));
        }
        Assert.assertEquals((long)((MessageIdImpl)ids.get(0)).getLedgerId(), (long)((MessageIdImpl)ids.get(1)).getLedgerId());
        MessageIdImpl id1 = new MessageIdImpl(((MessageIdImpl)ids.get(0)).getLedgerId(), ((MessageIdImpl)ids.get(0)).getEntryId(), ((MessageIdImpl)ids.get(0)).getPartitionIndex());
        long publish1 = (Long)publishTimeMap.get(ids.get(0));
        Assert.assertEquals((long)((MessageIdImpl)ids.get(2)).getLedgerId(), (long)((MessageIdImpl)ids.get(3)).getLedgerId());
        MessageIdImpl id2 = new MessageIdImpl(((MessageIdImpl)ids.get(2)).getLedgerId(), ((MessageIdImpl)ids.get(2)).getEntryId(), ((MessageIdImpl)ids.get(2)).getPartitionIndex());
        long publish2 = (Long)publishTimeMap.get(ids.get(2));
        Assert.assertTrue((publish1 < publish2 ? 1 : 0) != 0);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp", publish1 - 1L), (Object)id1);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp", publish1), (Object)id1);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp", publish1 + 1L), (Object)id2);
        Assert.assertEquals((Object)this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp", publish2), (Object)id2);
        Assert.assertTrue((this.admin.topics().getMessageIdByTimestamp("persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp", publish2 + 1L).compareTo((Object)id2) > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testDeleteTopic() throws Exception {
        String topicName = "topic-1";
        BrokerService brokerService = (BrokerService)Mockito.spy((Object)this.pulsar.getBrokerService());
        ((PulsarService)Mockito.doReturn((Object)brokerService).when((Object)this.pulsar)).getBrokerService();
        this.persistentTopics.createNonPartitionedTopic("my-tenant", "my-namespace", "topic-1", false);
        CompletableFuture deleteTopicFuture = new CompletableFuture();
        deleteTopicFuture.completeExceptionally((Throwable)new MetadataStoreException.NotFoundException());
        ((BrokerService)Mockito.doReturn(deleteTopicFuture).when((Object)brokerService)).deleteTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        this.persistentTopics.deleteTopic("my-tenant", "my-namespace", "topic-1", true, true, true);
        CompletableFuture deleteTopicFuture2 = new CompletableFuture();
        deleteTopicFuture2.completeExceptionally((Throwable)new MetadataStoreException("test exception"));
        ((BrokerService)Mockito.doReturn(deleteTopicFuture2).when((Object)brokerService)).deleteTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        try {
            this.persistentTopics.deleteTopic("my-tenant", "my-namespace", "topic-1", true, true, true);
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof RestException));
        }
        CompletableFuture deleteTopicFuture3 = new CompletableFuture();
        deleteTopicFuture3.completeExceptionally((Throwable)new MetadataStoreException.NotFoundException());
        ((BrokerService)Mockito.doReturn(deleteTopicFuture3).when((Object)brokerService)).deleteTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        try {
            this.persistentTopics.deleteTopic("my-tenant", "my-namespace", "topic-1", false, true, true);
        }
        catch (RestException e) {
            Assert.assertEquals((int)e.getResponse().getStatus(), (int)404);
        }
    }

    @Test
    public void testResetCursorReturnTimeoutWhenZKTimeout() {
        String topic = "persistent://my-tenant/my-namespace/topic-2";
        BrokerService brokerService = (BrokerService)Mockito.spy((Object)this.pulsar.getBrokerService());
        ((PulsarService)Mockito.doReturn((Object)brokerService).when((Object)this.pulsar)).getBrokerService();
        CompletableFuture completableFuture = new CompletableFuture();
        ((BrokerService)Mockito.doReturn(completableFuture).when((Object)brokerService)).getTopicIfExists(topic);
        try {
            this.admin.topics().resetCursor(topic, "my-sub", System.currentTimeMillis());
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            String errorMsg = (String)((InternalServerErrorException)e.getCause()).getResponse().readEntity(String.class);
            Assert.assertTrue((boolean)errorMsg.contains("TimeoutException"));
        }
    }
}

