/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TopicDuplicationTest
extends ProducerConsumerBase {
    private final String testTenant = "my-property";
    private final String testNamespace = "my-ns";
    private final String myNamespace = "my-property/my-ns";
    private final String testTopic = "persistent://my-property/my-ns/max-unacked-";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.resetConfig();
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setBrokerDeduplicationEnabled(true);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=10000L)
    public void testDuplicationApi() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.waitCacheInit(topicName);
        Boolean enabled = this.admin.topics().getDeduplicationEnabled(topicName);
        Assert.assertNull((Object)enabled);
        this.admin.topics().enableDeduplication(topicName, true);
        Awaitility.await().until(() -> this.admin.topics().getDeduplicationEnabled(topicName) != null);
        Assert.assertTrue((boolean)this.admin.topics().getDeduplicationEnabled(topicName));
        this.admin.topics().disableDeduplication(topicName);
        Awaitility.await().until(() -> this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null);
        Assert.assertNull((Object)this.admin.topics().getDeduplicationEnabled(topicName));
    }

    @Test(timeOut=10000L)
    public void testTopicDuplicationApi2() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.waitCacheInit(topicName);
        Boolean enabled = this.admin.topics().getDeduplicationStatus(topicName);
        Assert.assertNull((Object)enabled);
        this.admin.topics().setDeduplicationStatus(topicName, true);
        Awaitility.await().until(() -> this.admin.topics().getDeduplicationStatus(topicName) != null);
        Assert.assertTrue((boolean)this.admin.topics().getDeduplicationStatus(topicName));
        this.admin.topics().removeDeduplicationStatus(topicName);
        Awaitility.await().until(() -> this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null);
        Assert.assertNull((Object)this.admin.topics().getDeduplicationStatus(topicName));
    }

    @Test(timeOut=10000L)
    public void testTopicDuplicationAppliedApi() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.waitCacheInit(topicName);
        Assert.assertNull((Object)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns"));
        Assert.assertNull((Object)this.admin.topics().getDeduplicationStatus(topicName));
        Assert.assertEquals((boolean)this.admin.topics().getDeduplicationStatus(topicName, true), (boolean)this.conf.isBrokerDeduplicationEnabled());
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", false);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getDeduplicationStatus(topicName, true)));
        this.admin.topics().setDeduplicationStatus(topicName, true);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.topics().getDeduplicationStatus(topicName, true)));
        this.admin.topics().removeDeduplicationStatus(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.topics().getDeduplicationStatus(topicName, true)));
        this.admin.namespaces().removeDeduplicationStatus("my-property/my-ns");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((boolean)this.admin.topics().getDeduplicationStatus(topicName, true), (boolean)this.conf.isBrokerDeduplicationEnabled()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testDeduplicationPriority() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        String producerName = "my-producer";
        int maxMsgNum = 5;
        this.waitCacheInit(topicName);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).producerName("my-producer").create();
        try {
            long maxSeq = this.sendMessageAndGetMaxSeq(5, producer);
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topicName).get()).get();
            MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
            this.checkDeduplicationEnabled("my-producer", messageDeduplication, maxSeq);
            this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", false);
            Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
            this.sendMessageAndGetMaxSeq(5, producer);
            this.checkDeduplicationDisabled("my-producer", messageDeduplication);
            this.admin.topics().setDeduplicationStatus(topicName, true);
            Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getDeduplicationStatus(topicName)));
            Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)messageDeduplication.isEnabled()));
            long maxSeq2 = this.sendMessageAndGetMaxSeq(5, producer);
            this.checkDeduplicationEnabled("my-producer", messageDeduplication, maxSeq2);
            this.admin.topics().removeDeduplicationStatus(topicName);
            Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getDeduplicationStatus(topicName)));
            Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)messageDeduplication.isEnabled()));
            producer.newMessage().value((Object)"msg").sequenceId(1L).send();
            this.checkDeduplicationDisabled("my-producer", messageDeduplication);
            this.admin.namespaces().removeDeduplicationStatus("my-property/my-ns");
            Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
            Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)messageDeduplication.isEnabled()));
            long maxSeq3 = this.sendMessageAndGetMaxSeq(5, producer);
            this.checkDeduplicationEnabled("my-producer", messageDeduplication, maxSeq3);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private long sendMessageAndGetMaxSeq(int maxMsgNum, Producer producer) throws Exception {
        long seq = System.nanoTime();
        for (int i = 0; i <= maxMsgNum; ++i) {
            producer.newMessage().value((Object)("msg-" + i)).sequenceId(seq + (long)i).send();
        }
        return seq + (long)maxMsgNum;
    }

    private void checkDeduplicationDisabled(String producerName, MessageDeduplication messageDeduplication) throws Exception {
        ((CompletableFuture)messageDeduplication.checkStatus().whenComplete((res, ex) -> {
            if (ex != null) {
                Assert.fail((String)"should not fail");
            }
            Assert.assertEquals((long)messageDeduplication.getLastPublishedSequenceId(producerName), (long)-1L);
            Assert.assertEquals((long)messageDeduplication.highestSequencedPersisted.size(), (long)0L);
            Assert.assertEquals((long)messageDeduplication.highestSequencedPushed.size(), (long)0L);
        })).get();
    }

    private void checkDeduplicationEnabled(String producerName, MessageDeduplication messageDeduplication, long maxSeq) throws Exception {
        ((CompletableFuture)messageDeduplication.checkStatus().whenComplete((res, ex) -> {
            if (ex != null) {
                Assert.fail((String)"should not fail");
            }
            Assert.assertNotNull((Object)messageDeduplication.highestSequencedPersisted);
            Assert.assertNotNull((Object)messageDeduplication.highestSequencedPushed);
            long seqId = messageDeduplication.getLastPublishedSequenceId(producerName);
            Assert.assertEquals((long)seqId, (long)maxSeq);
            Assert.assertEquals((long)((Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName)), (long)maxSeq);
            Assert.assertEquals((long)((Long)messageDeduplication.highestSequencedPushed.get((Object)producerName)), (long)maxSeq);
        })).get();
    }

    @Test(timeOut=10000L)
    public void testDuplicationSnapshotApi() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.waitCacheInit(topicName);
        Integer interval = this.admin.topics().getDeduplicationSnapshotInterval(topicName);
        Assert.assertNull((Object)interval);
        this.admin.topics().setDeduplicationSnapshotInterval(topicName, 1024);
        Awaitility.await().until(() -> this.admin.topics().getDeduplicationSnapshotInterval(topicName) != null);
        Assert.assertEquals((int)this.admin.topics().getDeduplicationSnapshotInterval(topicName), (int)1024);
        this.admin.topics().removeDeduplicationSnapshotInterval(topicName);
        Awaitility.await().until(() -> this.admin.topics().getDeduplicationSnapshotInterval(topicName) == null);
        Assert.assertNull((Object)this.admin.topics().getDeduplicationSnapshotInterval(topicName));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testTopicPolicyTakeSnapshot() throws Exception {
        super.internalCleanup();
        this.resetConfig();
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(Integer.valueOf(7));
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalSetup();
        super.producerBaseSetup();
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        String producerName = "my-producer";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("my-producer").create();
        try {
            this.waitCacheInit(topicName);
            this.admin.topics().setDeduplicationSnapshotInterval(topicName, 3);
            this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", Integer.valueOf(5));
            int msgNum = 10;
            CountDownLatch countDownLatch = new CountDownLatch(msgNum);
            for (int i = 0; i < msgNum; ++i) {
                producer.newMessage().value((Object)("msg" + i)).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topicName).get()).get();
            long seqId = (Long)persistentTopic.getMessageDeduplication().highestSequencedPersisted.get((Object)"my-producer");
            PositionImpl position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertEquals((long)seqId, (long)(msgNum - 1));
            Assert.assertEquals((long)position.getEntryId(), (long)(msgNum - 1));
            Awaitility.await().until(() -> ((PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition()).getEntryId() == (long)(msgNum - 1));
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            PositionImpl markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            Assert.assertEquals((Object)position, (Object)markDeletedPosition);
            this.admin.topics().removeDeduplicationSnapshotInterval(topicName);
            producer.newMessage().value((Object)"msg").send();
            Awaitility.await().until(() -> ((PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition()).getEntryId() == (long)msgNum);
            markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertEquals((long)msgNum, (long)markDeletedPosition.getEntryId());
            Assert.assertEquals((Object)position, (Object)markDeletedPosition);
            this.admin.namespaces().removeDeduplicationSnapshotInterval("my-property/my-ns");
            Awaitility.await().until(() -> this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns") == null);
            producer.newMessage().value((Object)"msg").send();
            Thread.sleep(3000L);
            markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertNotEquals((Object)(msgNum + 1), (Object)markDeletedPosition.getEntryId());
            Assert.assertNotEquals((Object)position, (Object)markDeletedPosition);
            Awaitility.await().until(() -> ((PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition()).getEntryId() == (long)(msgNum + 1));
            markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertEquals((long)(msgNum + 1), (long)markDeletedPosition.getEntryId());
            Assert.assertEquals((Object)position, (Object)markDeletedPosition);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testDuplicationMethod() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        String producerName = "my-producer";
        int maxMsgNum = 100;
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/max-unacked-", 3);
        this.waitCacheInit(topicName);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).producerName("my-producer").create();
        try {
            long maxSeq = this.sendMessageAndGetMaxSeq(100, producer);
            CompletableFuture completableFuture = (CompletableFuture)this.pulsar.getBrokerService().getTopics().get((Object)topicName);
            Topic topic = (Topic)((Optional)completableFuture.get(1L, TimeUnit.SECONDS)).get();
            PersistentTopic persistentTopic = (PersistentTopic)topic;
            MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
            this.checkDeduplicationEnabled("my-producer", messageDeduplication, maxSeq);
            this.admin.topics().enableDeduplication(topicName, false);
            Awaitility.await().until(() -> this.admin.topics().getDeduplicationEnabled(topicName) != null);
            for (int i = 0; i < 100; ++i) {
                producer.newMessage().value((Object)("msg-" + i)).sequenceId(maxSeq + (long)i).send();
            }
            this.checkDeduplicationDisabled("my-producer", messageDeduplication);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test(timeOut=40000L)
    public void testDuplicationSnapshot() throws Exception {
        this.testTakeSnapshot(true);
        this.testTakeSnapshot(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTakeSnapshot(boolean enabledSnapshot) throws Exception {
        super.internalCleanup();
        this.resetConfig();
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(enabledSnapshot ? 1 : 0);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(Integer.valueOf(1));
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalSetup();
        super.producerBaseSetup();
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        String producerName = "my-producer";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("my-producer").create();
        try {
            int msgNum = 50;
            CountDownLatch countDownLatch = new CountDownLatch(msgNum);
            for (int i = 0; i < msgNum; ++i) {
                producer.newMessage().value((Object)("msg" + i)).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topicName).get()).get();
            long seqId = (Long)persistentTopic.getMessageDeduplication().highestSequencedPersisted.get((Object)"my-producer");
            PositionImpl position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertEquals((long)seqId, (long)(msgNum - 1));
            Assert.assertEquals((long)position.getEntryId(), (long)(msgNum - 1));
            Thread.sleep(2000L);
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            PositionImpl markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            if (enabledSnapshot) {
                Assert.assertEquals((Object)position, (Object)markDeletedPosition);
            } else {
                Assert.assertNotEquals((Object)position, (Object)markDeletedPosition);
                Assert.assertNotEquals((Object)markDeletedPosition.getEntryId(), (Object)-1);
            }
            producer.newMessage().value((Object)"msg").send();
            markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertNotEquals((Object)msgNum, (Object)markDeletedPosition.getEntryId());
            Assert.assertNotNull((Object)position);
            Thread.sleep(2000L);
            markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            if (enabledSnapshot) {
                Assert.assertEquals((long)msgNum, (long)markDeletedPosition.getEntryId());
                Assert.assertEquals((Object)position, (Object)markDeletedPosition);
            } else {
                Assert.assertNotEquals((Object)msgNum, (Object)markDeletedPosition.getEntryId());
                Assert.assertNotEquals((Object)position, (Object)markDeletedPosition);
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test(timeOut=30000L)
    public void testNamespacePolicyApi() throws Exception {
        Integer interval = this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns");
        Assert.assertNull((Object)interval);
        this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", Integer.valueOf(100));
        interval = this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns");
        Assert.assertEquals((int)interval, (int)100);
        this.admin.namespaces().removeDeduplicationSnapshotInterval("my-property/my-ns");
        interval = this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns");
        Assert.assertNull((Object)interval);
        this.admin.namespaces().setDeduplicationSnapshotIntervalAsync("my-property/my-ns", Integer.valueOf(200)).get();
        interval = (Integer)this.admin.namespaces().getDeduplicationSnapshotIntervalAsync("my-property/my-ns").get();
        Assert.assertEquals((int)interval, (int)200);
        this.admin.namespaces().removeDeduplicationSnapshotIntervalAsync("my-property/my-ns").get();
        interval = (Integer)this.admin.namespaces().getDeduplicationSnapshotIntervalAsync("my-property/my-ns").get();
        Assert.assertNull((Object)interval);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testNamespacePolicyTakeSnapshot() throws Exception {
        super.internalCleanup();
        this.resetConfig();
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(Integer.valueOf(3));
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalSetup();
        super.producerBaseSetup();
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        String producerName = "my-producer";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("my-producer").create();
        try {
            this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", Integer.valueOf(1));
            int msgNum = 50;
            CountDownLatch countDownLatch = new CountDownLatch(msgNum);
            for (int i = 0; i < msgNum; ++i) {
                producer.newMessage().value((Object)("msg" + i)).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topicName).get()).get();
            long seqId = (Long)persistentTopic.getMessageDeduplication().highestSequencedPersisted.get((Object)"my-producer");
            PositionImpl position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertEquals((long)seqId, (long)(msgNum - 1));
            Assert.assertEquals((long)position.getEntryId(), (long)(msgNum - 1));
            Awaitility.await().until(() -> ((PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition()).getEntryId() == (long)(msgNum - 1));
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            PositionImpl markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            Assert.assertEquals((Object)position, (Object)markDeletedPosition);
            this.admin.namespaces().removeDeduplicationSnapshotInterval("my-property/my-ns");
            Thread.sleep(2000L);
            markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertNotEquals((Object)(msgNum - 1), (Object)markDeletedPosition.getEntryId());
            Assert.assertNotEquals((Object)position, (Object)markDeletedPosition.getEntryId());
            Thread.sleep(1000L);
            markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertEquals((long)(msgNum - 1), (long)markDeletedPosition.getEntryId());
            Assert.assertEquals((Object)position, (Object)markDeletedPosition);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testDisableNamespacePolicyTakeSnapshot() throws Exception {
        super.internalCleanup();
        this.resetConfig();
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(Integer.valueOf(1));
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalSetup();
        super.producerBaseSetup();
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        String producerName = "my-producer";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName("my-producer").create();
        try {
            this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", Integer.valueOf(0));
            int msgNum = 50;
            CountDownLatch countDownLatch = new CountDownLatch(msgNum);
            for (int i = 0; i < msgNum; ++i) {
                producer.newMessage().value((Object)("msg" + i)).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topicName).get()).get();
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            PositionImpl markDeletedPosition = (PositionImpl)managedCursor.getMarkDeletedPosition();
            long seqId = (Long)persistentTopic.getMessageDeduplication().highestSequencedPersisted.get((Object)"my-producer");
            PositionImpl position = (PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            Assert.assertEquals((long)seqId, (long)(msgNum - 1));
            Assert.assertEquals((long)position.getEntryId(), (long)(msgNum - 1));
            Awaitility.await().until(() -> ((PositionImpl)persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition()).getEntryId() == -1L);
            Assert.assertEquals((Object)markDeletedPosition, (Object)managedCursor.getMarkDeletedPosition());
            Assert.assertEquals((long)markDeletedPosition.getEntryId(), (long)-1L);
            Assert.assertNotEquals((Object)position, (Object)markDeletedPosition);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private void waitCacheInit(String topicName) throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe().close();
        TopicName topic = TopicName.get((String)topicName);
    }
}

