/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class NonDurableSubscriptionTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NonDurableSubscriptionTest.class);
    private final AtomicInteger numFlow = new AtomicInteger(0);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSubscriptionExpirationTimeMinutes(1);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Override
    protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
        return new PulsarService(conf){

            protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
                BrokerService broker = new BrokerService((PulsarService)this, this.ioEventLoopGroup);
                broker.setPulsarChannelInitializerFactory((_pulsar, opts) -> new PulsarChannelInitializer(_pulsar, opts){

                    protected ServerCnx newServerCnx(PulsarService pulsar, String listenerName) throws Exception {
                        return new ServerCnx(pulsar){

                            protected void handleFlow(CommandFlow flow) {
                                super.handleFlow(flow);
                                NonDurableSubscriptionTest.this.numFlow.incrementAndGet();
                            }
                        };
                    }
                });
                return broker;
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNonDurableSubscription() throws Exception {
        String topicName = "persistent://my-property/my-ns/nonDurable-topic1";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).readCompacted(true).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-nonDurable-subscriber").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Message message;
                int i;
                int messageNum = 10;
                for (i = 0; i < messageNum; ++i) {
                    producer.send((Object)("message" + i));
                }
                for (i = 0; i < 5; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                    consumer.acknowledge(message);
                }
                ((ConsumerImpl)consumer).getClientCnx().close();
                for (i = 5; i < messageNum; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSameSubscriptionNameForDurableAndNonDurableSubscription() throws Exception {
        String topicName = "persistent://my-property/my-ns/same-sub-name-topic";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).readCompacted(true).subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("mix-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        try {
            consumer.close();
            try {
                Consumer consumerNoDurable = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).readCompacted(true).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("mix-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    Assert.fail((String)"should fail since durable subscription already exist.");
                }
                finally {
                    if (Collections.singletonList(consumerNoDurable).get(0) != null) {
                        consumerNoDurable.close();
                    }
                }
            }
            catch (PulsarClientException.NotAllowedException consumerNoDurable) {
                // empty catch block
            }
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
            try {
                Consumer noDurableConsumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Shared).subscriptionName("mix-subscription-01").receiverQueueSize(1).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    try {
                        Consumer durableConsumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared).subscriptionName("mix-subscription-01").receiverQueueSize(1).startMessageIdInclusive().subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                        if (Collections.singletonList(durableConsumer).get(0) != null) {
                            durableConsumer.close();
                        }
                    }
                    catch (PulsarClientException.NotAllowedException notAllowedException) {
                        // empty catch block
                    }
                }
                finally {
                    if (Collections.singletonList(noDurableConsumer).get(0) != null) {
                        noDurableConsumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    @Test(timeOut=10000L)
    public void testDeleteInactiveNonPersistentSubscription() throws Exception {
        String topic = "non-persistent://my-property/my-ns/topic-" + UUID.randomUUID();
        String subName = "my-subscriber";
        this.admin.topics().createNonPartitionedTopic(topic);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("my-subscriber").subscribe();
        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get();
        NonPersistentSubscription nonPersistentSubscription = (NonPersistentSubscription)nonPersistentTopic.getSubscription("my-subscriber");
        Assert.assertNotNull((Object)nonPersistentSubscription);
        Assert.assertNotNull((Object)nonPersistentSubscription.getDispatcher());
        AssertJUnit.assertTrue((boolean)nonPersistentSubscription.getDispatcher().isConsumerConnected());
        AssertJUnit.assertFalse((boolean)nonPersistentSubscription.isReplicated());
        nonPersistentTopic.checkInactiveSubscriptions();
        Thread.sleep(500L);
        nonPersistentSubscription = (NonPersistentSubscription)nonPersistentTopic.getSubscription("my-subscriber");
        Assert.assertNotNull((Object)nonPersistentSubscription);
        consumer.close();
        Thread.sleep(500L);
        Field f = NonPersistentSubscription.class.getDeclaredField("lastActive");
        f.setAccessible(true);
        f.set(nonPersistentTopic.getSubscription("my-subscriber"), System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        nonPersistentTopic.checkInactiveSubscriptions();
        Thread.sleep(500L);
        nonPersistentSubscription = (NonPersistentSubscription)nonPersistentTopic.getSubscription("my-subscriber");
        AssertJUnit.assertNull((Object)nonPersistentSubscription);
    }

    @DataProvider(name="subscriptionTypes")
    public static Object[][] subscriptionTypes() {
        Object[][] result = new Object[SubscriptionType.values().length][];
        int i = 0;
        for (SubscriptionType type : SubscriptionType.values()) {
            result[i++] = new Object[]{type};
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="subscriptionTypes")
    public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType) throws Exception {
        log.info("testing {}", (Object)subscriptionType);
        String topicName = "persistent://my-property/my-ns/nonDurable-sub-recorvery-" + subscriptionType;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(subscriptionType).subscriptionName("my-nonDurable-subscriber").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Message message;
                int i;
                int messageNum = 15;
                for (i = 0; i < messageNum; ++i) {
                    producer.send((Object)("message" + i));
                }
                for (i = 0; i < 5; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                    consumer.acknowledge(message);
                }
                ((ConsumerImpl)consumer).getClientCnx().close();
                for (i = 5; i < 10; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                }
                this.restartBroker();
                for (i = 10; i < messageNum; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void testFlowCountForMultiTopics() throws Exception {
        String topicName = "persistent://my-property/my-ns/test-flow-count";
        int numPartitions = 5;
        this.admin.topics().createPartitionedTopic(topicName, numPartitions);
        this.numFlow.set(0);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("my-nonDurable-subscriber").subscriptionMode(SubscriptionMode.NonDurable).subscribe();
        consumer.receive(1, TimeUnit.SECONDS);
        consumer.close();
        Assert.assertEquals((int)this.numFlow.get(), (int)numPartitions);
    }
}

