/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Lists;
import java.util.Collection;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class CreateSubscriptionTest
extends ProducerConsumerBase {
    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void createSubscriptionSingleTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/my-topic";
        this.admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
        try {
            this.admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
            Assert.fail((String)"Should have failed");
        }
        catch (PulsarAdminException.ConflictException e) {
            Assert.assertEquals((int)((ClientErrorException)e.getCause()).getResponse().getStatus(), (int)Response.Status.CONFLICT.getStatusCode());
        }
        Assert.assertEquals((Collection)this.admin.topics().getSubscriptions(topic), (Collection)Lists.newArrayList((Object[])new String[]{"sub-1"}));
        Producer p1 = this.pulsarClient.newProducer().topic(topic).create();
        p1.send((Object)"test-1".getBytes());
        p1.send((Object)"test-2".getBytes());
        MessageId m3 = p1.send((Object)"test-3".getBytes());
        Assert.assertEquals((long)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("sub-1")).getMsgBacklog(), (long)3L);
        this.admin.topics().createSubscription(topic, "sub-2", MessageId.latest);
        Assert.assertEquals((long)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("sub-2")).getMsgBacklog(), (long)0L);
        this.admin.topics().createSubscription(topic, "sub-3", MessageId.earliest);
        Assert.assertEquals((long)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("sub-3")).getMsgBacklog(), (long)3L);
        this.admin.topics().createSubscription(topic, "sub-5", m3);
        Assert.assertEquals((long)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("sub-5")).getMsgBacklog(), (long)1L);
    }

    @Test
    public void createSubscriptionOnPartitionedTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/my-partitioned-topic";
        this.admin.topics().createPartitionedTopic(topic, 10);
        this.admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
        try {
            this.admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
            Assert.fail((String)"Should have failed");
        }
        catch (Exception exception) {
            // empty catch block
        }
        for (int i = 0; i < 10; ++i) {
            Assert.assertEquals((Collection)this.admin.topics().getSubscriptions(TopicName.get((String)topic).getPartition(i).toString()), (Collection)Lists.newArrayList((Object[])new String[]{"sub-1"}));
        }
    }

    @Test
    public void createSubscriptionOnPartitionedTopicWithPartialFailure() throws Exception {
        String topic = "persistent://my-property/my-ns/my-partitioned-topic";
        this.admin.topics().createPartitionedTopic(topic, 10);
        String partitionedTopic0 = topic + "-partition-0";
        this.admin.topics().createSubscription(partitionedTopic0, "sub-1", MessageId.latest);
        this.admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
        try {
            this.admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
            Assert.fail((String)"Should have failed");
        }
        catch (Exception exception) {
            // empty catch block
        }
        for (int i = 0; i < 10; ++i) {
            Assert.assertEquals((Collection)this.admin.topics().getSubscriptions(TopicName.get((String)topic).getPartition(i).toString()), (Collection)Lists.newArrayList((Object[])new String[]{"sub-1"}));
        }
    }

    @Test
    public void testWaitingCurosrCausedMemoryLeak() throws Exception {
        String topic = "persistent://my-property/my-ns/my-topic";
        for (int i = 0; i < 10; ++i) {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
            Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)consumer.isConnected()));
            consumer.close();
        }
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topic).get();
        ManagedLedgerImpl ml = (ManagedLedgerImpl)topicRef.getManagedLedger();
        Assert.assertEquals((int)ml.getWaitingCursorsCount(), (int)0);
    }
}

