/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class DispatchAccordingPermitsTests
extends ProducerConsumerBase {
    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testFlowPermitsWithMultiBatchesDispatch() throws PulsarAdminException, PulsarClientException {
        int i;
        String topic = "persistent://public/default/testFlowPermitsWithMultiBatchesDispatch";
        String subName = "test";
        this.admin.topics().createSubscription("persistent://public/default/testFlowPermitsWithMultiBatchesDispatch", "test", MessageId.earliest);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://public/default/testFlowPermitsWithMultiBatchesDispatch").batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS).create();
        for (i = 0; i < 100; ++i) {
            producer.sendAsync((Object)("msg - " + i));
        }
        producer.flush();
        for (i = 0; i < 350; ++i) {
            producer.sendAsync((Object)("msg - " + i));
        }
        producer.flush();
        for (i = 0; i < 50; ++i) {
            producer.sendAsync((Object)("msg - " + i));
            producer.flush();
        }
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://public/default/testFlowPermitsWithMultiBatchesDispatch"}).subscriptionName("test").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        for (int i2 = 0; i2 < 500; ++i2) {
            consumer.acknowledge(consumer.receive());
        }
        ConsumerImpl consumerImpl = (ConsumerImpl)consumer;
        Assert.assertEquals((int)consumerImpl.incomingMessages.size(), (int)0);
        TopicStats stats = this.admin.topics().getStats("persistent://public/default/testFlowPermitsWithMultiBatchesDispatch");
        Assert.assertTrue((((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().get("test")).getConsumers().get(0)).getAvailablePermits() > 0 ? 1 : 0) != 0);
    }
}

