/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ResourceGroupRateLimiterTest
extends BrokerTestBase {
    final String rgName = "testRG";
    org.apache.pulsar.common.policies.data.ResourceGroup testAddRg = new org.apache.pulsar.common.policies.data.ResourceGroup();
    final String namespaceName = "prop/ns-abc";
    final String persistentTopicString = "persistent://prop/ns-abc/test-topic";
    final String nonPersistentTopicString = "non-persistent://prop/ns-abc/test-topic";
    final int MESSAGE_SIZE = 10;

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setMaxPendingPublishRequestsPerConnection(0);
        super.baseSetup();
        this.prepareData();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    public void createResourceGroup(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rg) throws PulsarAdminException {
        this.admin.resourcegroups().createResourceGroup(rgName, rg);
        Awaitility.await().untilAsserted(() -> {
            ResourceGroup resourceGroup = this.pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName);
            Assert.assertNotNull((Object)resourceGroup);
            Assert.assertEquals((String)rgName, (String)resourceGroup.resourceGroupName);
        });
    }

    public void deleteResourceGroup(String rgName) throws PulsarAdminException {
        this.admin.resourcegroups().deleteResourceGroup(rgName);
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertNull((Object)this.pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName)));
    }

    public void testRateLimit(String topicString) throws PulsarAdminException, PulsarClientException, InterruptedException, ExecutionException, TimeoutException {
        this.createResourceGroup("testRG", this.testAddRg);
        this.admin.namespaces().setNamespaceResourceGroup("prop/ns-abc", "testRG");
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.pulsar.getResourceGroupServiceManager().getNamespaceResourceGroup(NamespaceName.get((String)"prop/ns-abc"))));
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.pulsar.getResourceGroupServiceManager().resourceGroupGet("testRG").getResourceGroupPublishLimiter()));
        Producer producer = null;
        try {
            producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/test-topic").create();
        }
        catch (PulsarClientException p) {
            String errMesg = String.format("Got exception while building producer: ex=%s", p.getMessage());
            Assert.fail((String)errMesg);
        }
        MessageId messageId = null;
        try {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(100L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)messageId);
        }
        catch (TimeoutException e) {
            Assert.fail((String)"should not fail");
        }
        Producer finalProducer = producer;
        Assert.assertThrows(TimeoutException.class, () -> finalProducer.sendAsync((Object)new byte[10]).get(500L, TimeUnit.MILLISECONDS));
        Thread.sleep(2000L);
        try {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(100L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)messageId);
        }
        catch (TimeoutException e) {
            Assert.fail((String)"should not fail");
        }
        this.admin.namespaces().removeNamespaceResourceGroup("prop/ns-abc");
        this.deleteResourceGroup("testRG");
        for (int i = 0; i < 5; ++i) {
            messageId = (MessageId)producer.sendAsync((Object)new byte[10]).get(100L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)messageId);
        }
        producer.close();
    }

    @Test
    public void testResourceGroupPublishRateLimit() throws Exception {
        this.testRateLimit("persistent://prop/ns-abc/test-topic");
        this.testRateLimit("non-persistent://prop/ns-abc/test-topic");
    }

    private void prepareData() {
        this.testAddRg.setPublishRateInBytes(10L);
        this.testAddRg.setPublishRateInMsgs(1);
        this.testAddRg.setDispatchRateInMsgs(-1);
        this.testAddRg.setDispatchRateInBytes(-1L);
    }
}

