/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceQuotaCalculator;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageConsumer;
import org.apache.pulsar.broker.resourcegroup.ResourceUsagePublisher;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ResourceGroupUsageAggregationTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ResourceGroupUsageAggregationTest.class);
    ResourceGroupService rgs;
    ResourceGroup activeRG;
    final org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = new org.apache.pulsar.common.policies.data.ResourceGroup();
    final String activeRgName = "runProduceConsume";
    int numRgUsageListenerCallbacks = 0;
    int numRgFillUsageCallbacks = 0;
    final String TenantName = "pulsar-test";
    final String NsName = "test";
    final String TenantAndNsName = "pulsar-test/test";
    final String TestProduceConsumeTopicName = "/test/prod-cons-topic";
    final String PRODUCE_CONSUME_PERSISTENT_TOPIC = "persistent://pulsar-test/test/test/prod-cons-topic";
    final String PRODUCE_CONSUME_NON_PERSISTENT_TOPIC = "non-persistent://pulsar-test/test/test/prod-cons-topic";
    private static final int PUBLISH_INTERVAL_SECS = 300;

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.prepareData();
        ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator(){

            public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes, long currentMessagesUsed, long lastReportedMessages, long lastReportTimeMSecsSinceEpoch) {
                return false;
            }

            public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
                return 0L;
            }
        };
        ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(this.pulsar);
        this.rgs = new ResourceGroupService(this.pulsar, TimeUnit.MILLISECONDS, transportMgr, dummyQuotaCalc);
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testProduceConsumeUsageOnRG() throws Exception {
        this.testProduceConsumeUsageOnRG("persistent://pulsar-test/test/test/prod-cons-topic");
        this.testProduceConsumeUsageOnRG("non-persistent://pulsar-test/test/test/prod-cons-topic");
    }

    private void testProduceConsumeUsageOnRG(String topicString) throws Exception {
        ResourceUsagePublisher ruP = new ResourceUsagePublisher(){

            public String getID() {
                return ResourceGroupUsageAggregationTest.this.activeRG.getID();
            }

            public void fillResourceUsage(ResourceUsage resourceUsage) {
                ResourceGroupUsageAggregationTest.this.activeRG.rgFillResourceUsage(resourceUsage);
                ++ResourceGroupUsageAggregationTest.this.numRgFillUsageCallbacks;
            }
        };
        ResourceUsageConsumer ruC = new ResourceUsageConsumer(){

            public String getID() {
                return ResourceGroupUsageAggregationTest.this.activeRG.getID();
            }

            public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
                ResourceGroupUsageAggregationTest.this.activeRG.rgResourceUsageListener(broker, resourceUsage);
                ++ResourceGroupUsageAggregationTest.this.numRgUsageListenerCallbacks;
            }
        };
        this.rgConfig.setPublishRateInBytes(1500L);
        this.rgConfig.setPublishRateInMsgs(100);
        this.rgConfig.setDispatchRateInBytes(4000L);
        this.rgConfig.setPublishRateInMsgs(500);
        this.rgs.resourceGroupCreate("runProduceConsume", this.rgConfig, ruP, ruC);
        this.activeRG = this.rgs.resourceGroupGet("runProduceConsume");
        Assert.assertNotEquals((Object)this.activeRG, null);
        Producer producer = null;
        Consumer consumer = null;
        producer = this.pulsarClient.newProducer().topic(topicString).create();
        try {
            consumer = this.pulsarClient.newConsumer().topic(new String[]{topicString}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscribe();
        }
        catch (PulsarClientException p) {
            String errMsg = String.format("Got exception while building consumer: ex=%s", p.getMessage());
            Assert.fail((String)errMsg);
        }
        TopicName myTopic = TopicName.get((String)topicString);
        String tenantString = myTopic.getTenant();
        String nsString = myTopic.getNamespace();
        this.rgs.registerTenant("runProduceConsume", tenantString);
        this.rgs.registerNameSpace("runProduceConsume", NamespaceName.get((String)nsString));
        int NumMessagesToSend = 10;
        int sentNumBytes = 0;
        int sentNumMsgs = 0;
        int recvdNumBytes = 0;
        int recvdNumMsgs = 0;
        for (int ix = 0; ix < 10; ++ix) {
            try {
                byte[] mesg = String.format("Hi, ix=%s", ix).getBytes();
                producer.send((Object)mesg);
                sentNumBytes += mesg.length;
                ++sentNumMsgs;
                continue;
            }
            catch (PulsarClientException p) {
                String errMsg = String.format("Got exception while sending %s-th time: ex=%s", ix, p.getMessage());
                Assert.fail((String)errMsg);
            }
        }
        producer.close();
        this.verfyStats(topicString, "runProduceConsume", sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, false);
        Message message = null;
        while (recvdNumMsgs < sentNumMsgs) {
            try {
                message = consumer.receive();
                recvdNumBytes += ((byte[])message.getValue()).length;
            }
            catch (PulsarClientException p) {
                String errMesg = String.format("Got exception in while receiving %s-th mesg at consumer: ex=%s", recvdNumMsgs, p.getMessage());
                Assert.fail((String)errMesg);
            }
            ++recvdNumMsgs;
        }
        this.verfyStats(topicString, "runProduceConsume", sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true);
        consumer.close();
        this.rgs.unRegisterTenant("runProduceConsume", tenantString);
        this.rgs.unRegisterNameSpace("runProduceConsume", NamespaceName.get((String)nsString));
        this.rgs.resourceGroupDelete("runProduceConsume");
    }

    private void verfyStats(String topicString, String rgName, int sentNumBytes, int sentNumMsgs, int recvdNumBytes, int recvdNumMsgs, boolean checkProduce, boolean checkConsume) throws InterruptedException, PulsarAdminException {
        BrokerService bs = this.pulsar.getBrokerService();
        Map topicStatsMap = bs.getTopicStats();
        for (Map.Entry entry : topicStatsMap.entrySet()) {
            String mapTopicName = (String)entry.getKey();
            if (!mapTopicName.equals(topicString)) continue;
            TopicStatsImpl stats = (TopicStatsImpl)entry.getValue();
            if (checkProduce) {
                Assert.assertTrue((stats.bytesInCounter >= (long)sentNumBytes ? 1 : 0) != 0);
                Assert.assertTrue((stats.msgInCounter == (long)sentNumMsgs ? 1 : 0) != 0);
            }
            if (checkConsume) {
                Assert.assertTrue((stats.bytesOutCounter >= (long)recvdNumBytes ? 1 : 0) != 0);
                Assert.assertTrue((stats.msgOutCounter == (long)recvdNumMsgs ? 1 : 0) != 0);
            }
            if (sentNumMsgs <= 0 && recvdNumMsgs <= 0) continue;
            this.rgs.aggregateResourceGroupLocalUsages();
            ResourceGroup.BytesAndMessagesCount prodCounts = this.rgs.getRGUsage(rgName, ResourceGroup.ResourceGroupMonitoringClass.Publish, ResourceGroupService.ResourceGroupUsageStatsType.Cumulative);
            ResourceGroup.BytesAndMessagesCount consCounts = this.rgs.getRGUsage(rgName, ResourceGroup.ResourceGroupMonitoringClass.Dispatch, ResourceGroupService.ResourceGroupUsageStatsType.Cumulative);
            ResourceGroup.BytesAndMessagesCount prodCounts1 = this.rgs.getRGUsage(rgName, ResourceGroup.ResourceGroupMonitoringClass.Publish, ResourceGroupService.ResourceGroupUsageStatsType.Cumulative);
            ResourceGroup.BytesAndMessagesCount consCounts1 = this.rgs.getRGUsage(rgName, ResourceGroup.ResourceGroupMonitoringClass.Dispatch, ResourceGroupService.ResourceGroupUsageStatsType.Cumulative);
            Assert.assertTrue((prodCounts.bytes == prodCounts1.bytes ? 1 : 0) != 0);
            Assert.assertTrue((prodCounts.messages == prodCounts1.messages ? 1 : 0) != 0);
            Assert.assertTrue((consCounts.bytes == consCounts1.bytes ? 1 : 0) != 0);
            Assert.assertTrue((consCounts.messages == consCounts1.messages ? 1 : 0) != 0);
            if (checkProduce) {
                Assert.assertTrue((prodCounts.bytes >= (long)sentNumBytes ? 1 : 0) != 0);
                Assert.assertTrue((prodCounts.messages == (long)sentNumMsgs ? 1 : 0) != 0);
            }
            if (!checkConsume) continue;
            Assert.assertTrue((consCounts.bytes >= (long)recvdNumBytes ? 1 : 0) != 0);
            Assert.assertTrue((consCounts.messages == (long)recvdNumMsgs ? 1 : 0) != 0);
        }
    }

    private void prepareData() throws PulsarAdminException {
        this.conf.setResourceUsageTransportPublishIntervalInSecs(300);
        this.conf.setAllowAutoTopicCreation(true);
        String clusterName = "test";
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("pulsar-test", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"fakeAdminRole"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("pulsar-test/test");
        this.admin.namespaces().setNamespaceReplicationClusters("pulsar-test/test", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }
}

