/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BatchMessageTest;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BatchMessageWithBatchIndexLevelTest
extends BatchMessageTest {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        super.baseSetup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBatchMessageAck() {
        int numMsgs = 40;
        String topicName = "persistent://prop/ns-abc/batchMessageAck-" + UUID.randomUUID();
        String subscriptionName = "sub-batch-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-batch-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
            try {
                ArrayList sendFutureList = Lists.newArrayList();
                for (int i = 0; i < numMsgs; ++i) {
                    byte[] message = ("batch-message-" + i).getBytes();
                    sendFutureList.add(producer.newMessage().value((Object)message).sendAsync());
                }
                FutureUtil.waitForAll((List)sendFutureList).get();
                PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
                PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)topic.getSubscription("sub-batch-1").getDispatcher();
                Message receive1 = consumer.receive();
                Message receive2 = consumer.receive();
                consumer.acknowledge(receive1);
                consumer.acknowledge(receive2);
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)18));
                Message receive3 = consumer.receive();
                Message receive4 = consumer.receive();
                consumer.acknowledge(receive3);
                consumer.acknowledge(receive4);
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)16));
                Message receive5 = consumer.receive();
                consumer.negativeAcknowledge(receive5);
                Awaitility.await().pollInterval(1L, TimeUnit.MILLISECONDS).untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)0));
                consumer.receive();
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)16));
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBatchMessageMultiNegtiveAck() throws Exception {
        String topicName = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
        String subscriptionName = "sub-negtive-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName("sub-negtive-1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
            try {
                int i;
                int N = 20;
                for (i = 0; i < 20; ++i) {
                    String value = "test-" + i;
                    producer.sendAsync((Object)value);
                }
                producer.flush();
                for (i = 0; i < 20; ++i) {
                    Message msg = consumer.receive();
                    if (i % 2 == 0) {
                        consumer.acknowledgeAsync(msg);
                        continue;
                    }
                    consumer.negativeAcknowledge(msg);
                }
                Awaitility.await().untilAsserted(() -> {
                    long unackedMessages = ((SubscriptionStats)this.admin.topics().getStats(topicName).getSubscriptions().get("sub-negtive-1")).getUnackedMessages();
                    Assert.assertEquals((long)unackedMessages, (long)10L);
                });
                String topicName2 = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck2-" + UUID.randomUUID();
                String subscriptionName2 = "sub-negtive-2";
                org.apache.pulsar.client.api.Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName2}).subscriptionName("sub-negtive-2").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
                try {
                    Producer producer2 = this.pulsarClient.newProducer(Schema.STRING).topic(topicName2).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
                    try {
                        int i2;
                        for (i2 = 0; i2 < 20; ++i2) {
                            String value = "test-" + i2;
                            producer2.sendAsync((Object)value);
                        }
                        producer2.flush();
                        for (i2 = 0; i2 < 20; ++i2) {
                            Message msg = consumer2.receive();
                            if (i2 % 2 == 0) {
                                consumer.acknowledgeAsync(msg);
                                continue;
                            }
                            consumer.negativeAcknowledge(msg);
                            Thread.sleep(100L);
                        }
                        Awaitility.await().untilAsserted(() -> {
                            long unackedMessages = ((SubscriptionStats)this.admin.topics().getStats(topicName).getSubscriptions().get("sub-negtive-1")).getUnackedMessages();
                            Assert.assertEquals((long)unackedMessages, (long)10L);
                        });
                    }
                    finally {
                        if (Collections.singletonList(producer2).get(0) != null) {
                            producer2.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer2).get(0) != null) {
                        consumer2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }
}

