/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class ClientDeduplicationTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ClientDeduplicationTest.class);

    @DataProvider
    public static Object[][] batchingTypes() {
        return new Object[][]{{BatcherBuilder.DEFAULT}, {BatcherBuilder.KEY_BASED}};
    }

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(priority=-1)
    public void testNamespaceDeduplicationApi() throws Exception {
        String namespace = "my-property/my-ns";
        Assert.assertNull((Object)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns"));
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", false);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
        this.admin.namespaces().removeDeduplicationStatus("my-property/my-ns");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
    }

    @Test
    public void testProducerSequenceAfterReconnect() throws Exception {
        String message;
        int i;
        String topic = "persistent://my-property/my-ns/testProducerSequenceAfterReconnect";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testProducerSequenceAfterReconnect").producerName("my-producer-name");
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)9L);
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
    }

    @Test
    public void testProducerSequenceAfterRestart() throws Exception {
        String message;
        int i;
        String topic = "persistent://my-property/my-ns/testProducerSequenceAfterRestart";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name");
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)9L);
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testProducerDeduplication() throws Exception {
        String topic = "persistent://my-property/my-ns/testProducerDeduplication";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name").sendTimeout(0, TimeUnit.SECONDS);
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(0L).send();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(1L).send();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(2L).send();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(1L).send();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(2L).send();
        producer.close();
        for (int i = 0; i < 3; ++i) {
            Message msg = consumer.receive();
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer.acknowledge(msg);
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)2L);
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(1L).send();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(2L).send();
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
    }

    @Test(timeOut=30000L, dataProvider="batchingTypes")
    public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception {
        String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-" + System.currentTimeMillis();
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name").enableBatching(true).batcherBuilder(batcherBuilder).batchingMaxMessages(10).batchingMaxPublishDelay(1L, TimeUnit.HOURS).sendTimeout(0, TimeUnit.SECONDS);
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(3L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(5L).sendAsync();
        producer.flush();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(4L).sendAsync();
        producer.newMessage().value((Object)"my-message-3".getBytes()).sequenceId(6L).sendAsync();
        producer.flush();
        for (int i = 0; i < 4; ++i) {
            Message msg = consumer.receive(3, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer.acknowledge(msg);
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)6L);
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(4L).sendAsync();
        producer.flush();
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testProducerDeduplicationNonBatchAsync() throws Exception {
        String topic = "persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS);
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(3L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(5L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(4L).sendAsync();
        producer.close();
        for (int i = 0; i < 3; ++i) {
            Message msg = consumer.receive();
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer.acknowledge(msg);
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)5L);
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(4L).sendAsync();
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testKeyBasedBatchingOrder() throws Exception {
        Message msg;
        int i;
        String topic = "persistent://my-property/my-ns/test-key-based-batching-order";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/test-key-based-batching-order"}).subscriptionName("sub").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/test-key-based-batching-order").batcherBuilder(BatcherBuilder.KEY_BASED).batchingMaxMessages(100).batchingMaxBytes(0x500000).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
        ArrayList<CompletableFuture> sendFutures = new ArrayList<CompletableFuture>();
        sendFutures.add(producer.newMessage().key("A").value((Object)"msg-0").sequenceId(0L).sendAsync());
        sendFutures.add(producer.newMessage().key("B").value((Object)"msg-1").sequenceId(1L).sendAsync());
        sendFutures.add(producer.newMessage().key("B").value((Object)"msg-2").sequenceId(2L).sendAsync());
        sendFutures.add(producer.newMessage().key("A").value((Object)"msg-3").sequenceId(3L).sendAsync());
        sendFutures.add(producer.newMessage().key("A").value((Object)"msg-4").sequenceId(4L).sendAsync());
        producer.flush();
        FutureUtil.waitForAll(sendFutures);
        List sendMessageIds = sendFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
        for (int i2 = 0; i2 < sendMessageIds.size(); ++i2) {
            log.info("Send msg-{} to {}", (Object)i2, sendMessageIds.get(i2));
        }
        ArrayList<Long> sequenceIdList = new ArrayList<Long>();
        for (i = 0; i < 5 && (msg = consumer.receive(3, TimeUnit.SECONDS)) != null; ++i) {
            log.info("Received {}, key: {}, seq id: {}, msg id: {}", new Object[]{msg.getValue(), msg.getKey(), msg.getSequenceId(), msg.getMessageId()});
            Assert.assertNotNull((Object)msg);
            sequenceIdList.add(msg.getSequenceId());
        }
        Assert.assertEquals(sequenceIdList, Arrays.asList(1L, 2L, 0L, 3L, 4L));
        for (i = 0; i < 5; ++i) {
            MessageId messageId = producer.newMessage().value((Object)"msg").sequenceId((long)i).send();
            Assert.assertTrue((boolean)(messageId instanceof BatchMessageIdImpl));
            BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl)messageId;
            Assert.assertEquals((long)messageIdImpl.getLedgerId(), (long)-1L);
            Assert.assertEquals((long)messageIdImpl.getEntryId(), (long)-1L);
        }
        consumer.close();
        producer.close();
    }
}

