/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.util.Timeout;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiMessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class TopicsConsumerImplTest
extends ProducerConsumerBase {
    private static final long testTimeout = 90000L;
    private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImplTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2L);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=90000L)
    public void testDifferentTopicsNameSubscribe() throws Exception {
        String key = "TopicsFromDifferentNamespace";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://prop/use/ns-abc1/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc2/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc3/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test(timeOut=90000L)
    public void testGetConsumersAndGetTopics() throws Exception {
        String key = "TopicsConsumerGet";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).topic(new String[]{topicName3}).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        Assert.assertTrue((boolean)consumer.getTopic().startsWith("MultiTopicsConsumer-"));
        List topics = ((MultiTopicsConsumerImpl)consumer).getPartitions();
        List consumers = ((MultiTopicsConsumerImpl)consumer).getConsumers();
        topics.forEach(topic -> log.info("topic: {}", topic));
        consumers.forEach(c -> log.info("consumer: {}", (Object)c.getTopic()));
        IntStream.range(0, 6).forEach(index -> Assert.assertEquals((String)((ConsumerImpl)consumers.get(index)).getTopic(), (String)((String)topics.get(index))));
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)2);
        consumer.unsubscribe();
        consumer.close();
    }

    @Test(timeOut=90000L)
    public void testSyncProducerAndConsumer() throws Exception {
        String key = "TopicsConsumerSyncTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=90000L)
    public void testAsyncConsumer() throws Exception {
        String key = "TopicsConsumerAsyncTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            futures.add(producer1.sendAsync((Object)(messagePredicate + "producer1-" + i).getBytes()));
            futures.add(producer2.sendAsync((Object)(messagePredicate + "producer2-" + i).getBytes()));
            futures.add(producer3.sendAsync((Object)(messagePredicate + "producer3-" + i).getBytes()));
        }
        log.info("Waiting for async publish to complete : {}", (Object)futures.size());
        for (Future future : futures) {
            future.get();
        }
        log.info("start async consume");
        CountDownLatch latch = new CountDownLatch(30);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            executor.execute(() -> IntStream.range(0, 30).forEach(index -> ((CompletableFuture)consumer.receiveAsync().thenAccept(msg -> {
                Assert.assertTrue((boolean)(msg instanceof TopicMessageImpl));
                try {
                    consumer.acknowledge(msg);
                }
                catch (PulsarClientException e1) {
                    Assert.fail((String)"message acknowledge failed", (Throwable)e1);
                }
                latch.countDown();
                log.info("receive index: {}, latch countDown: {}", (Object)index, (Object)latch.getCount());
            })).exceptionally(ex -> {
                log.warn("receive index: {}, failed receive message {}", (Object)index, (Object)ex.getMessage());
                ex.printStackTrace();
                return null;
            })));
            latch.await();
            log.info("success latch wait");
            consumer.unsubscribe();
            consumer.close();
            producer1.close();
            producer2.close();
            producer3.close();
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test(timeOut=90000L)
    public void testConsumerUnackedRedelivery() throws Exception {
        String key = "TopicsConsumerRedeliveryTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        Message message = consumer.receive();
        while (message != null) {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            log.debug("Consumer received : " + new String(message.getData()));
            message = consumer.receive(500, TimeUnit.MILLISECONDS);
        }
        long size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)30L);
        message = consumer.receive();
        HashSet<String> hSet = new HashSet<String>();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            hSet.add(new String(message.getData()));
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        Assert.assertEquals((int)hSet.size(), (int)30);
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round2" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round2" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round2" + i).getBytes());
        }
        message = consumer.receive();
        int received = 0;
        while (message != null) {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++received;
            String data = new String(message.getData());
            log.debug("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        Assert.assertEquals((int)received, (int)30);
        Thread.sleep(this.ackTimeOutMillis);
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round3" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round3" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round3" + i).getBytes());
        }
        message = consumer.receive();
        while (message != null) {
            String data = new String(message.getData());
            log.debug("Consumer received : " + data);
            message = consumer.receive(100, TimeUnit.MILLISECONDS);
        }
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.debug(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)30L);
        Thread.sleep(this.ackTimeOutMillis);
        message = consumer.receive();
        int redelivered = 0;
        while (message != null) {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++redelivered;
            String data = new String(message.getData());
            log.debug("Consumer received : " + data);
            consumer.acknowledge(message);
            message = consumer.receive(2000, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals((int)redelivered, (int)30);
        size = ((MultiTopicsConsumerImpl)consumer).getUnAckedMessageTracker().size();
        log.info(key + " Unacked Message Tracker size is " + size);
        Assert.assertEquals((long)size, (long)0L);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test
    public void testTopicNameValid() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testTopicNameValid";
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/testTopicNameValid", 3);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testTopicNameValid"}).subscriptionName("subscriptionName").subscribe();
        ((CompletableFuture)((MultiTopicsConsumerImpl)consumer).subscribeAsync("ns-abc/testTopicNameValid", 5).handle((res, exception) -> {
            Assert.assertTrue((boolean)(exception instanceof PulsarClientException.AlreadyClosedException));
            Assert.assertEquals((String)((PulsarClientException.AlreadyClosedException)((Object)((Object)exception))).getMessage(), (String)"Topic name not valid");
            return null;
        })).get();
        ((CompletableFuture)((MultiTopicsConsumerImpl)consumer).subscribeAsync("persistent://prop/use/ns-abc/testTopicNameValid", 3).handle((res, exception) -> {
            Assert.assertTrue((boolean)(exception instanceof PulsarClientException.AlreadyClosedException));
            Assert.assertEquals((String)((PulsarClientException.AlreadyClosedException)((Object)((Object)exception))).getMessage(), (String)"Already subscribed to persistent://prop/use/ns-abc/testTopicNameValid");
            return null;
        })).get();
    }

    @Test
    public void testSubscribeUnsubscribeSingleTopic() throws Exception {
        String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        CompletableFuture unsubFuture = ((MultiTopicsConsumerImpl)consumer).unsubscribeAsync(topicName3);
        unsubFuture.get();
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round2" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round2" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round2" + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)20);
        List topics = ((MultiTopicsConsumerImpl)consumer).getPartitions();
        List consumers = ((MultiTopicsConsumerImpl)consumer).getConsumers();
        Assert.assertEquals((int)topics.size(), (int)3);
        Assert.assertEquals((int)consumers.size(), (int)3);
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)1);
        CompletableFuture subFuture = ((MultiTopicsConsumerImpl)consumer).subscribeAsync(topicName3, true);
        subFuture.get();
        for (int i = 0; i < 10; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-round3" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-round3" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-round3" + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            Assert.assertTrue((boolean)(message instanceof TopicMessageImpl));
            ++messageSet;
            consumer.acknowledge(message);
            log.debug("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        topics = ((MultiTopicsConsumerImpl)consumer).getPartitions();
        consumers = ((MultiTopicsConsumerImpl)consumer).getConsumers();
        Assert.assertEquals((int)topics.size(), (int)6);
        Assert.assertEquals((int)consumers.size(), (int)6);
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer).getPartitionedTopics().size(), (int)2);
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test
    public void testResubscribeSameTopic() throws Exception {
        String localTopicName = "TopicsConsumerResubscribeSameTopicTest";
        String localPartitionName = "TopicsConsumerResubscribeSameTopicTest-partition-0";
        String topicNameWithNamespace = "public/default/TopicsConsumerResubscribeSameTopicTest";
        String topicNameWithDomain = "persistent://public/default/TopicsConsumerResubscribeSameTopicTest";
        this.admin.topics().createPartitionedTopic("TopicsConsumerResubscribeSameTopicTest", 2);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"TopicsConsumerResubscribeSameTopicTest"}).subscriptionName("SubscriptionName").subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl)consumer;
        ((CompletableFuture)multiTopicsConsumer.subscribeAsync("public/default/TopicsConsumerResubscribeSameTopicTest", false).handle((res, exception) -> {
            Assert.assertTrue((boolean)(exception instanceof PulsarClientException.AlreadyClosedException));
            Assert.assertEquals((String)exception.getMessage(), (String)"Already subscribed to public/default/TopicsConsumerResubscribeSameTopicTest");
            return null;
        })).get();
        ((CompletableFuture)multiTopicsConsumer.subscribeAsync("persistent://public/default/TopicsConsumerResubscribeSameTopicTest", false).handle((res, exception) -> {
            Assert.assertTrue((boolean)(exception instanceof PulsarClientException.AlreadyClosedException));
            Assert.assertEquals((String)exception.getMessage(), (String)"Already subscribed to persistent://public/default/TopicsConsumerResubscribeSameTopicTest");
            return null;
        })).get();
        ((CompletableFuture)multiTopicsConsumer.subscribeAsync("TopicsConsumerResubscribeSameTopicTest-partition-0", false).handle((res, exception) -> {
            Assert.assertTrue((boolean)(exception instanceof PulsarClientException.AlreadyClosedException));
            Assert.assertEquals((String)exception.getMessage(), (String)"Already subscribed to TopicsConsumerResubscribeSameTopicTest-partition-0");
            return null;
        })).get();
        consumer.unsubscribe();
        consumer.close();
    }

    @Test(timeOut=90000L)
    public void testTopicsNameSubscribeWithBuilderFail() throws Exception {
        String key = "TopicsNameSubscribeWithBuilder";
        String subscriptionName = "my-ex-subscription-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        try {
            this.pulsarClient.newConsumer().subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe1 with no topicName should fail.");
        }
        catch (PulsarClientException pulsarClientException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[0]).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe2 with no topicName should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topics(null).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe3 with no topicName should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topics((List)Lists.newArrayList()).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
            Assert.fail((String)"subscribe4 with no topicName should fail.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test(timeOut=30000L)
    public void testMultiTopicsMessageListener() throws Exception {
        String key = "MultiTopicsMessageListenerTest";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 6;
        CountDownLatch latch = new CountDownLatch(18);
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName1, 2);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(1000L, TimeUnit.MILLISECONDS).receiverQueueSize(100).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            latch.countDown();
            log.info("Received message [{}] in the listener, latch: {}", (Object)receivedMessage, (Object)latch.getCount());
        }).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 6; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
        }
        latch.await();
        consumer.close();
    }

    @Test(timeOut=30000L)
    public void testTopicAutoUpdatePartitions() throws Exception {
        String key = "TestTopicAutoUpdatePartitions";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 6;
        String topicName1 = "persistent://my-property/my-ns/topic-1-" + key;
        String topicName2 = "persistent://my-property/my-ns/topic-2-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName1, 2);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).autoUpdatePartitions(true).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl)consumer;
        this.admin.topics().updatePartitionedTopic(topicName1, 3);
        this.admin.topics().updatePartitionedTopic(topicName2, 3);
        log.info("trigger partitionsAutoUpdateTimerTask");
        Timeout timeout = topicsConsumer.getPartitionsAutoUpdateTimeout();
        timeout.task().run(timeout);
        Thread.sleep(200L);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1 + "-partition-2").enableBatching(false).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2 + "-partition-2").enableBatching(false).create();
        for (int i = 0; i < 6; ++i) {
            producer1.send((Object)(messagePredicate + "topic1-partition-2 index:" + i).getBytes());
            producer2.send((Object)(messagePredicate + "topic2-partition-2 index:" + i).getBytes());
            log.info("produce message to partition-2 again. messageindex: {}", (Object)i);
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            ++messageSet;
            consumer.acknowledge(message);
            log.info("4 Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(200, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)12);
        consumer.close();
    }

    @Test(timeOut=90000L)
    public void testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() throws Exception {
        int i;
        int i2;
        String topicName = "persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
        String subName = "failover-test";
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions", 2);
        Assert.assertEquals((int)this.admin.topics().getPartitionedTopicMetadata((String)"persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions").partitions, (int)2);
        Consumer consumer_1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failover-test").subscribe();
        Assert.assertTrue((boolean)(consumer_1 instanceof MultiTopicsConsumerImpl));
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer_1).allTopicPartitionsNumber.get(), (int)2);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions").messageRouter(new MessageRouter(){

            public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                return Integer.parseInt(msg.getKey()) % metadata.numPartitions();
            }
        }).create();
        int messages = 20;
        for (int i3 = 0; i3 < 20; ++i3) {
            producer.newMessage().key(String.valueOf(i3)).value((Object)("message - " + i3)).send();
        }
        int received = 0;
        Message lastMessage = null;
        for (int i4 = 0; i4 < 20; ++i4) {
            lastMessage = consumer_1.receive();
            ++received;
        }
        Assert.assertEquals((int)received, (int)20);
        consumer_1.acknowledgeCumulative(lastMessage);
        this.admin.topics().updatePartitionedTopic("persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions", 4);
        log.info("trigger partitionsAutoUpdateTimerTask");
        Timeout timeout = ((MultiTopicsConsumerImpl)consumer_1).getPartitionsAutoUpdateTimeout();
        timeout.task().run(timeout);
        Thread.sleep(200L);
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer_1).allTopicPartitionsNumber.get(), (int)4);
        for (i2 = 0; i2 < 20; ++i2) {
            producer.newMessage().key(String.valueOf(i2)).value((Object)("message - " + i2)).send();
        }
        received = 0;
        lastMessage = null;
        for (i2 = 0; i2 < 20; ++i2) {
            lastMessage = consumer_1.receive();
            ++received;
        }
        Assert.assertEquals((int)received, (int)20);
        consumer_1.acknowledgeCumulative(lastMessage);
        Consumer consumer_2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failover-test").subscribe();
        Assert.assertTrue((boolean)(consumer_2 instanceof MultiTopicsConsumerImpl));
        Assert.assertEquals((int)((MultiTopicsConsumerImpl)consumer_1).allTopicPartitionsNumber.get(), (int)4);
        for (int i5 = 0; i5 < 20; ++i5) {
            producer.newMessage().key(String.valueOf(i5)).value((Object)("message - " + i5)).send();
        }
        HashMap<String, AtomicInteger> activeConsumers = new HashMap<String, AtomicInteger>();
        PartitionedTopicStats stats = this.admin.topics().getPartitionedStats("persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions", true);
        for (TopicStats value : stats.getPartitions().values()) {
            for (SubscriptionStats subscriptionStats : value.getSubscriptions().values()) {
                Assert.assertTrue((subscriptionStats.getActiveConsumerName().equals(consumer_1.getConsumerName()) || subscriptionStats.getActiveConsumerName().equals(consumer_2.getConsumerName()) ? 1 : 0) != 0);
                activeConsumers.putIfAbsent(subscriptionStats.getActiveConsumerName(), new AtomicInteger(0));
                ((AtomicInteger)activeConsumers.get(subscriptionStats.getActiveConsumerName())).incrementAndGet();
            }
        }
        Assert.assertEquals((int)((AtomicInteger)activeConsumers.get(consumer_1.getConsumerName())).get(), (int)2);
        Assert.assertEquals((int)((AtomicInteger)activeConsumers.get(consumer_2.getConsumerName())).get(), (int)2);
        received = 0;
        lastMessage = null;
        for (i = 0; i < 10; ++i) {
            lastMessage = consumer_1.receive();
            ++received;
        }
        Assert.assertEquals((int)received, (int)10);
        consumer_1.acknowledgeCumulative(lastMessage);
        received = 0;
        lastMessage = null;
        for (i = 0; i < 10; ++i) {
            lastMessage = consumer_2.receive();
            ++received;
        }
        Assert.assertEquals((int)received, (int)10);
        consumer_2.acknowledgeCumulative(lastMessage);
    }

    @Test(timeOut=90000L)
    public void testDefaultBacklogTTL() throws Exception {
        int defaultTTLSec = 5;
        int totalMessages = 10;
        this.conf.setTtlDurationDefaultInSeconds(defaultTTLSec);
        String namespace = "prop/use/expiry";
        String topicName = "persistent://prop/use/expiry/expiry";
        String subName = "expiredSub";
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("prop", (TenantInfo)new TenantInfoImpl(null, (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("prop/use/expiry");
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/expiry/expiry"}).subscriptionName("expiredSub").subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        consumer.close();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/expiry/expiry").enableBatching(false).create();
        for (int i = 0; i < totalMessages; ++i) {
            producer.send((Object)("" + i).getBytes());
        }
        Optional topic = (Optional)this.pulsar.getBrokerService().getTopic("persistent://prop/use/expiry/expiry", false).get();
        Assert.assertTrue((boolean)topic.isPresent());
        PersistentSubscription subscription = (PersistentSubscription)((Topic)topic.get()).getSubscription("expiredSub");
        Thread.sleep((defaultTTLSec - 1) * 1000);
        ((Topic)topic.get()).checkMessageExpiry();
        Thread.sleep(1000L);
        Assert.assertEquals((long)subscription.getNumberOfEntriesInBacklog(false), (long)10L);
        Thread.sleep(2000L);
        ((Topic)topic.get()).checkMessageExpiry();
        TopicsConsumerImplTest.retryStrategically(test -> subscription.getNumberOfEntriesInBacklog(false) == 0L, 5, 200L);
        Assert.assertEquals((long)subscription.getNumberOfEntriesInBacklog(false), (long)0L);
    }

    @Test(timeOut=90000L)
    public void testGetLastMessageId() throws Exception {
        String key = "TopicGetLastMessageId";
        String subscriptionName = "my-ex-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 30;
        String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
        String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
        String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
        ArrayList topicNames = Lists.newArrayList((Object[])new String[]{topicName1, topicName2, topicName3});
        TenantInfoImpl tenantInfo = this.createDefaultTenantInfo();
        this.admin.tenants().createTenant("prop", (TenantInfo)tenantInfo);
        this.admin.topics().createPartitionedTopic(topicName2, 2);
        this.admin.topics().createPartitionedTopic(topicName3, 3);
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName1).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topicName2).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer3 = this.pulsarClient.newProducer().topic(topicName3).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topics((List)topicNames).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).receiverQueueSize(4).subscribe();
        Assert.assertTrue((boolean)(consumer instanceof MultiTopicsConsumerImpl));
        for (int i = 0; i < 30; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        MessageId messageId = consumer.getLastMessageId();
        Assert.assertTrue((boolean)(messageId instanceof MultiMessageIdImpl));
        MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl)messageId;
        Map map = multiMessageId.getMap();
        Assert.assertEquals((int)map.size(), (int)6);
        map.forEach((k, v) -> {
            log.info("topic: {}, messageId:{} ", k, (Object)v.toString());
            Assert.assertTrue((boolean)(v instanceof MessageIdImpl));
            MessageIdImpl messageId1 = (MessageIdImpl)v;
            if (k.contains(topicName1)) {
                Assert.assertEquals((long)messageId1.entryId, (long)29L);
            } else if (k.contains(topicName2)) {
                Assert.assertEquals((long)messageId1.entryId, (long)14L);
            } else {
                Assert.assertEquals((long)messageId1.entryId, (long)9L);
            }
        });
        for (int i = 0; i < 30; ++i) {
            producer1.send((Object)(messagePredicate + "producer1-" + i).getBytes());
            producer2.send((Object)(messagePredicate + "producer2-" + i).getBytes());
            producer3.send((Object)(messagePredicate + "producer3-" + i).getBytes());
        }
        messageId = consumer.getLastMessageId();
        Assert.assertTrue((boolean)(messageId instanceof MultiMessageIdImpl));
        MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl)messageId;
        Map map2 = multiMessageId2.getMap();
        Assert.assertEquals((int)map2.size(), (int)6);
        map2.forEach((k, v) -> {
            log.info("topic: {}, messageId:{} ", k, (Object)v.toString());
            Assert.assertTrue((boolean)(v instanceof MessageIdImpl));
            MessageIdImpl messageId1 = (MessageIdImpl)v;
            if (k.contains(topicName1)) {
                Assert.assertEquals((long)messageId1.entryId, (long)59L);
            } else if (k.contains(topicName2)) {
                Assert.assertEquals((long)messageId1.entryId, (long)29L);
            } else {
                Assert.assertEquals((long)messageId1.entryId, (long)19L);
            }
        });
        consumer.unsubscribe();
        consumer.close();
        producer1.close();
        producer2.close();
        producer3.close();
    }

    @Test(timeOut=90000L)
    public void multiTopicsInDifferentNameSpace() throws PulsarAdminException, PulsarClientException {
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("persistent://prop/use/ns-abc/topic-1");
        topics.add("persistent://prop/use/ns-abc/topic-2");
        topics.add("persistent://prop/use/ns-abc1/topic-3");
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("prop", (TenantInfo)new TenantInfoImpl(null, (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("prop/use/ns-abc");
        this.admin.namespaces().createNamespace("prop/use/ns-abc1");
        Consumer consumer = this.pulsarClient.newConsumer().topics(topics).subscriptionName("multiTopicSubscription").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/use/ns-abc/topic-1").producerName("producer").create();
        Producer producer1 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/use/ns-abc/topic-2").producerName("producer1").create();
        Producer producer2 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/use/ns-abc1/topic-3").producerName("producer2").create();
        producer.send((Object)"ns-abc/topic-1-Message1");
        producer1.send((Object)"ns-abc/topic-2-Message1");
        producer2.send((Object)"ns-abc1/topic-3-Message1");
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            ++messageSet;
            consumer.acknowledge(message);
            log.info("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(200, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)3);
        consumer.unsubscribe();
        consumer.close();
        producer.close();
        producer1.close();
        producer2.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=90000L)
    public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() throws PulsarClientException {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).ioThreads(2).listenerThreads(3).operationTimeout(2, TimeUnit.MILLISECONDS).build();
        try {
            String topic0 = "public/default/topic0";
            String topic1 = "public/default/topic1";
            for (int i = 0; i < 10; ++i) {
                try {
                    client.newConsumer(Schema.STRING).subscriptionName("subName").topics((List)Lists.newArrayList((Object[])new String[]{topic0, topic1})).receiverQueueSize(2).subscriptionType(SubscriptionType.Shared).ackTimeout(365L, TimeUnit.DAYS).ackTimeoutTickTime(36L, TimeUnit.DAYS).acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                    Thread.sleep(3000L);
                    continue;
                }
                catch (Exception ex) {
                    Assert.assertTrue((boolean)(ex instanceof PulsarClientException.TimeoutException));
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    @Test(timeOut=90000L)
    public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
        String topicName = "persistent://public/default/issue-9585";
        this.admin.topics().createPartitionedTopic("persistent://public/default/issue-9585", 3);
        PatternMultiTopicsConsumerImpl consumer = (PatternMultiTopicsConsumerImpl)this.pulsarClient.newConsumer(Schema.STRING).topicsPattern("persistent://public/default/issue-9585").subscriptionName("sub-issue-9585").subscribe();
        Assert.assertEquals((int)consumer.getPartitionsOfTheTopicMap(), (int)3);
        Assert.assertEquals((int)consumer.getConsumers().size(), (int)3);
        this.admin.topics().deletePartitionedTopic("persistent://public/default/issue-9585", true);
        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)consumer.getPartitionsOfTheTopicMap(), (int)0);
            Assert.assertEquals((int)consumer.getConsumers().size(), (int)0);
        });
        this.admin.topics().createPartitionedTopic("persistent://public/default/issue-9585", 7);
        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)consumer.getPartitionsOfTheTopicMap(), (int)7);
            Assert.assertEquals((int)consumer.getConsumers().size(), (int)7);
        });
    }

    @Test(timeOut=90000L)
    public void testPartitionsUpdatesForMultipleTopics() throws Exception {
        String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
        String subName = "my-sub";
        this.admin.topics().createPartitionedTopic("persistent://public/default/testPartitionsUpdatesForMultipleTopics-0", 2);
        Assert.assertEquals((int)this.admin.topics().getPartitionedTopicMetadata((String)"persistent://public/default/testPartitionsUpdatesForMultipleTopics-0").partitions, (int)2);
        PatternMultiTopicsConsumerImpl consumer = (PatternMultiTopicsConsumerImpl)this.pulsarClient.newConsumer(Schema.STRING).topicsPattern("persistent://public/default/test.*").subscriptionType(SubscriptionType.Failover).subscriptionName("my-sub").subscribe();
        Assert.assertEquals((int)consumer.getPartitionsOfTheTopicMap(), (int)2);
        Assert.assertEquals((int)consumer.allTopicPartitionsNumber.intValue(), (int)2);
        this.admin.topics().updatePartitionedTopic("persistent://public/default/testPartitionsUpdatesForMultipleTopics-0", 5);
        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)consumer.getPartitionsOfTheTopicMap(), (int)5);
            Assert.assertEquals((int)consumer.allTopicPartitionsNumber.intValue(), (int)5);
        });
        String topicName1 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-1";
        this.admin.topics().createPartitionedTopic("persistent://public/default/testPartitionsUpdatesForMultipleTopics-1", 3);
        Assert.assertEquals((int)this.admin.topics().getPartitionedTopicMetadata((String)"persistent://public/default/testPartitionsUpdatesForMultipleTopics-1").partitions, (int)3);
        consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)consumer.getPartitionsOfTheTopicMap(), (int)8);
            Assert.assertEquals((int)consumer.allTopicPartitionsNumber.intValue(), (int)8);
        });
        this.admin.topics().updatePartitionedTopic("persistent://public/default/testPartitionsUpdatesForMultipleTopics-1", 5);
        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)consumer.getPartitionsOfTheTopicMap(), (int)10);
            Assert.assertEquals((int)consumer.allTopicPartitionsNumber.intValue(), (int)10);
        });
    }
}

