/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TopicTerminationTest
extends BrokerTestBase {
    private final String topicName = "persistent://prop/ns-abc/topic0";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSimpleTermination() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        producer.send((Object)"test-msg-1".getBytes());
        producer.send((Object)"test-msg-2".getBytes());
        MessageId msgId3 = producer.send((Object)"test-msg-3".getBytes());
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        try {
            producer.send((Object)"test-msg-4".getBytes());
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.TopicTerminatedException topicTerminatedException) {
            // empty catch block
        }
    }

    @Test(groups={"broker"})
    public void testCreateProducerOnTerminatedTopic() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        producer.send((Object)"test-msg-1".getBytes());
        producer.send((Object)"test-msg-2".getBytes());
        MessageId msgId3 = producer.send((Object)"test-msg-3".getBytes());
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").create();
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.TopicTerminatedException topicTerminatedException) {
            // empty catch block
        }
    }

    public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        producer.send((Object)"msg-1".getBytes());
        producer.send((Object)"msg-2".getBytes());
        MessageId msgId3 = producer.send((Object)"msg-3".getBytes());
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        producer.close();
        try {
            this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").create();
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.TopicTerminatedException topicTerminatedException) {
            // empty catch block
        }
        HashedWheelTimer timer = (HashedWheelTimer)((PulsarClientImpl)this.pulsarClient).timer();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)timer.pendingTimeouts(), (long)0L));
    }

    @Test(timeOut=20000L)
    public void testTerminateWhilePublishing() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        CyclicBarrier barrier = new CyclicBarrier(2);
        ArrayList futures = new ArrayList();
        Thread t = new Thread(() -> {
            try {
                barrier.await();
            }
            catch (InterruptedException | BrokenBarrierException exception) {
                // empty catch block
            }
            for (int i = 0; i < 1000; ++i) {
                futures.add(producer.sendAsync((Object)"test".getBytes()));
            }
        });
        t.start();
        barrier.await();
        this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        t.join();
        boolean alreadyFailed = false;
        try {
            FutureUtil.waitForAll(futures).get();
        }
        catch (Exception exception) {
            // empty catch block
        }
        for (int i = 0; i < 1000; ++i) {
            Assert.assertTrue((boolean)((CompletableFuture)futures.get(i)).isDone());
            if (alreadyFailed) {
                Assert.assertTrue((boolean)((CompletableFuture)futures.get(i)).isCompletedExceptionally());
            }
            alreadyFailed = ((CompletableFuture)futures.get(i)).isCompletedExceptionally();
        }
    }

    @Test(groups={"broker"})
    public void testDoubleTerminate() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        producer.send((Object)"test-msg-1".getBytes());
        producer.send((Object)"test-msg-2".getBytes());
        MessageId msgId3 = producer.send((Object)"test-msg-3".getBytes());
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
    }

    @Test(groups={"broker"})
    public void testTerminatePartitionedTopic() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/topic0", 4);
        try {
            this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
            Assert.fail((String)"Should have failed");
        }
        catch (ExecutionException ee) {
            Assert.assertEquals(ee.getCause().getClass(), PulsarAdminException.NotAllowedException.class);
        }
    }

    @Test(timeOut=20000L)
    public void testSimpleTerminationConsumer() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic0"}).subscriptionName("my-sub").subscribe();
        MessageId msgId1 = producer.send((Object)"test-msg-1".getBytes());
        MessageId msgId2 = producer.send((Object)"test-msg-2".getBytes());
        Message msg1 = consumer.receive();
        Assert.assertEquals((Object)msg1.getMessageId(), (Object)msgId1);
        consumer.acknowledge(msg1);
        MessageId msgId3 = producer.send((Object)"test-msg-3".getBytes());
        Assert.assertFalse((boolean)consumer.hasReachedEndOfTopic());
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        Message msg2 = consumer.receive();
        Assert.assertEquals((Object)msg2.getMessageId(), (Object)msgId2);
        consumer.acknowledge(msg2);
        Message msg3 = consumer.receive();
        Assert.assertEquals((Object)msg3.getMessageId(), (Object)msgId3);
        consumer.acknowledge(msg3);
        Message msg4 = consumer.receive(100, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg4);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)consumer.hasReachedEndOfTopic()));
    }

    @Test(timeOut=20000L)
    public void testSimpleTerminationMessageListener() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        final CountDownLatch latch = new CountDownLatch(1);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic0"}).subscriptionName("my-sub").messageListener((MessageListener)new MessageListener<byte[]>(){

            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
            }

            public void reachedEndOfTopic(Consumer<byte[]> consumer) {
                latch.countDown();
                Assert.assertTrue((boolean)consumer.hasReachedEndOfTopic());
            }
        }).subscribe();
        producer.send((Object)"test-msg-1".getBytes());
        producer.send((Object)"test-msg-2".getBytes());
        MessageId msgId3 = producer.send((Object)"test-msg-3".getBytes());
        consumer.acknowledgeCumulative(msgId3);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)consumer.hasReachedEndOfTopic()));
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        Assert.assertTrue((boolean)latch.await(3L, TimeUnit.SECONDS));
        Assert.assertTrue((boolean)consumer.hasReachedEndOfTopic());
    }

    @Test(timeOut=20000L)
    public void testSimpleTerminationReader() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        MessageId msgId1 = producer.send((Object)"test-msg-1".getBytes());
        MessageId msgId2 = producer.send((Object)"test-msg-2".getBytes());
        MessageId msgId3 = producer.send((Object)"test-msg-3".getBytes());
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        Reader reader = this.pulsarClient.newReader().topic("persistent://prop/ns-abc/topic0").startMessageId(MessageId.earliest).create();
        Message msg1 = reader.readNext();
        Assert.assertEquals((Object)msg1.getMessageId(), (Object)msgId1);
        Message msg2 = reader.readNext();
        Assert.assertEquals((Object)msg2.getMessageId(), (Object)msgId2);
        Message msg3 = reader.readNext();
        Assert.assertEquals((Object)msg3.getMessageId(), (Object)msgId3);
        Message msg4 = reader.readNext(100, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg4);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)reader.hasReachedEndOfTopic()));
    }

    @Test(timeOut=20000L)
    public void testSimpleTerminationReaderListener() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        final CountDownLatch latch = new CountDownLatch(1);
        Reader reader = this.pulsarClient.newReader().topic("persistent://prop/ns-abc/topic0").startMessageId(MessageId.latest).readerListener((ReaderListener)new ReaderListener<byte[]>(){

            public void received(Reader<byte[]> reader, Message<byte[]> msg) {
            }

            public void reachedEndOfTopic(Reader<byte[]> reader) {
                latch.countDown();
                Assert.assertTrue((boolean)reader.hasReachedEndOfTopic());
            }
        }).create();
        producer.send((Object)"test-msg-1".getBytes());
        producer.send((Object)"test-msg-2".getBytes());
        MessageId msgId3 = producer.send((Object)"test-msg-3".getBytes());
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)reader.hasReachedEndOfTopic()));
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId3);
        Assert.assertTrue((boolean)latch.await(3L, TimeUnit.SECONDS));
        Assert.assertTrue((boolean)reader.hasReachedEndOfTopic());
    }

    @Test(timeOut=20000L)
    public void testSubscribeOnTerminatedTopic() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        producer.send((Object)"test-msg-1".getBytes());
        MessageId msgId2 = producer.send((Object)"test-msg-2".getBytes());
        MessageId lastMessageId = (MessageId)this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Assert.assertEquals((Object)lastMessageId, (Object)msgId2);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic0"}).subscriptionName("my-sub").subscribe();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)consumer.hasReachedEndOfTopic()));
    }

    @Test(timeOut=20000L)
    public void testSubscribeOnTerminatedTopicWithNoMessages() throws Exception {
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic0").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.admin.topics().terminateTopicAsync("persistent://prop/ns-abc/topic0").get();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/topic0"}).subscriptionName("my-sub").subscribe();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)consumer.hasReachedEndOfTopic()));
    }
}

