/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import com.google.common.collect.Sets;
import io.prometheus.client.Summary;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceQuotaCalculator;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class RGUsageMTAggrWaitForAllMsgsTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(RGUsageMTAggrWaitForAllMsgsTest.class);
    private static final int PER_MESSAGE_METADATA_OHEAD = 42;
    private static final int PUBLISH_INTERVAL_SECS = 10;
    private static final int NUM_PRODUCERS = 4;
    private static final int NUM_CONSUMERS = 4;
    private static final int NUM_MESSAGES_PER_PRODUCER = 100;
    private static final int NUM_TOPICS = 8;
    private static final int NUM_RESOURCE_GROUPS = 4;
    private static final int NUM_TOTAL_MESSAGES = 400;
    private static final int NUM_MESSAGES_PER_CONSUMER = 100;
    private final org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = new org.apache.pulsar.common.policies.data.ResourceGroup();
    private ResourceGroupService rgservice;
    private ResourceGroup[] resGroups = new ResourceGroup[4];
    private final String clusterName = "test";
    private final String BaseRGName = "rg-";
    private final String BaseTestTopicName = "rgusage-topic-";
    private String[] RGNames = new String[4];
    private long numLocalUsageReports;
    private String[] TenantAndNsNameSameOrder = new String[4];
    private String[] TenantAndNsNameOppositeOrder = new String[4];
    private String[] TopicNamesSameTenantAndNsRGs = new String[8];
    private String[] TopicNamesDifferentTenantAndNsRGs = new String[8];
    private String[] PersistentTopicNamesSameTenanatAndNsRGs = new String[8];
    private String[] PersistentTopicNamesDifferentTenantAndNsRGs = new String[8];
    private String[] NonPersistentTopicNamesSameTenantAndNsRGs = new String[8];
    private String[] NonPersistentTopicNamesDifferentTenantAndNsRGs = new String[8];
    private List<String[]> AllTopicNames = Arrays.asList(this.PersistentTopicNamesSameTenanatAndNsRGs, this.PersistentTopicNamesDifferentTenantAndNsRGs, this.NonPersistentTopicNamesSameTenantAndNsRGs, this.NonPersistentTopicNamesDifferentTenantAndNsRGs);
    private final ResourceGroupService.ResourceGroupUsageStatsType getCumulativeUsageStats = ResourceGroupService.ResourceGroupUsageStatsType.Cumulative;
    HashSet<String> createdNamespaces = new HashSet();
    HashSet<String> createdTopics = new HashSet();
    HashSet<String> registeredTenants = new HashSet();
    HashSet<String> registeredNamespaces = new HashSet();
    long residualTenantRegs;
    long residualNamespaceRegs;
    long residualSentNumBytes;
    long residualSentNumMessages;
    long residualRecvdNumBytes;
    long residualRecvdNumMessages;

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.prepareForOps();
        ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator(){

            public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes, long currentMessagesUsed, long lastReportedMessages, long lastReportTimeMSecsSinceEpoch) {
                RGUsageMTAggrWaitForAllMsgsTest.this.numLocalUsageReports++;
                return true;
            }

            public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
                return 0L;
            }
        };
        ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(this.pulsar);
        this.rgservice = new ResourceGroupService(this.pulsar, TimeUnit.SECONDS, transportMgr, dummyQuotaCalc);
        this.prepareRGs();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMTProduceConsumeRGUsage() throws Exception {
        int topicSet = 0;
        for (String[] topicStrings : this.AllTopicNames) {
            this.testProduceConsumeUsageOnRG(topicStrings);
            log.info("Done with topic-set {}", (Object)topicSet);
            ++topicSet;
        }
        log.info("Done testing with all topics");
    }

    private String TopicToTenantRGName(TopicName topicName) {
        String tenant = topicName.getTenant();
        return tenant;
    }

    private String TopicToNamespaceRGName(TopicName topicName) {
        String nameSpace = topicName.getNamespacePortion();
        return nameSpace;
    }

    private boolean tenantRGEqualsNamespaceRG(String[] topicStrings) throws PulsarClientException {
        int numEqualRGs = 0;
        int numUnEqualRGs = 0;
        int numTopics = topicStrings.length;
        for (String topicStr : topicStrings) {
            String namespaceRG;
            TopicName topic = TopicName.get((String)topicStr);
            String tenantRG = this.TopicToTenantRGName(topic);
            if (tenantRG.compareTo(namespaceRG = this.TopicToNamespaceRGName(topic)) == 0) {
                ++numEqualRGs;
                continue;
            }
            ++numUnEqualRGs;
        }
        if (numEqualRGs + numUnEqualRGs != numTopics || numEqualRGs > 0 && numUnEqualRGs > 0) {
            String errMesg = String.format("Found %s topics with equal RGs and %s with unequal, on %s topics", numEqualRGs, numUnEqualRGs, numTopics);
            throw new PulsarClientException(errMesg);
        }
        return numEqualRGs == numTopics;
    }

    private void registerTenantsAndNamespaces(String[] topicStrings) throws Exception {
        for (String topicStr : topicStrings) {
            TopicName topic = TopicName.get((String)topicStr);
            String tenantRG = this.TopicToTenantRGName(topic);
            String namespaceRG = this.TopicToNamespaceRGName(topic);
            NamespaceName ns = topic.getNamespaceObject();
            if (!this.registeredTenants.contains(tenantRG)) {
                this.rgservice.registerTenant(tenantRG, tenantRG);
                this.registeredTenants.add(tenantRG);
            }
            if (this.registeredNamespaces.contains(namespaceRG)) continue;
            this.rgservice.registerNameSpace(namespaceRG, ns);
            this.registeredNamespaces.add(namespaceRG);
        }
    }

    private void unRegisterTenantsAndNamespaces(String[] topicStrings) throws Exception {
        for (String topicStr : topicStrings) {
            TopicName topic = TopicName.get((String)topicStr);
            String tenantRG = this.TopicToTenantRGName(topic);
            String namespaceRG = this.TopicToNamespaceRGName(topic);
            String tenantAndNamespace = topic.getNamespace();
            if (this.registeredTenants.contains(tenantRG)) {
                this.rgservice.unRegisterTenant(tenantRG, tenantRG);
                this.registeredTenants.remove(tenantRG);
            }
            if (!this.registeredNamespaces.contains(namespaceRG)) continue;
            this.rgservice.unRegisterNameSpace(namespaceRG, NamespaceName.get((String)tenantAndNamespace));
            this.registeredNamespaces.remove(namespaceRG);
        }
    }

    private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception {
        int ix;
        boolean[] joinedConsumers;
        int recvdNumMsgs;
        int ix2;
        int numReadyConsumers;
        this.createRGs();
        this.registerTenantsAndNamespaces(topicStrings);
        int TotalExpectedMessagesToSend = 400;
        int TotalExpectedMessagesToReceive = 400;
        SubscriptionType consumeSubscriptionType = SubscriptionType.Shared;
        producerWithThread[] prodThr = new producerWithThread[4];
        consumerWithThread[] consThr = new consumerWithThread[4];
        int sentNumBytes = 0;
        int sentNumMsgs = 0;
        int numProducerExceptions = 0;
        for (int ix3 = 0; ix3 < 4; ++ix3) {
            consThr[ix3] = new consumerWithThread();
            consumeMessages cm = new consumeMessages(ix3, 100, 400, consumeSubscriptionType, topicStrings);
            Thread thr = new Thread(cm);
            thr.start();
            consThr[ix3].consumer = cm;
            consThr[ix3].thread = thr;
        }
        do {
            Thread.sleep(500L);
            numReadyConsumers = 0;
            for (ix2 = 0; ix2 < 4; ++ix2) {
                if (!consThr[ix2].consumer.isConsumerReady()) continue;
                ++numReadyConsumers;
            }
            log.debug("{} consumers are not yet ready", (Object)(4 - numReadyConsumers));
        } while (numReadyConsumers < 4);
        for (ix2 = 0; ix2 < 4; ++ix2) {
            prodThr[ix2] = new producerWithThread();
            produceMessages pm = new produceMessages(ix2, 100, topicStrings);
            Thread thr = new Thread(pm);
            thr.start();
            prodThr[ix2].producer = pm;
            prodThr[ix2].thread = thr;
        }
        for (int ix4 = 0; ix4 < 4; ++ix4) {
            prodThr[ix4].thread.join();
            int sentBytes = prodThr[ix4].producer.getNumBytesSent();
            int sentMsgs = prodThr[ix4].producer.getNumMessagesSent();
            numProducerExceptions += prodThr[ix4].producer.getNumExceptions();
            log.debug("Producer={} sent {} mesgs and {} bytes", new Object[]{ix4, sentMsgs, sentBytes});
            sentNumBytes += sentBytes;
            sentNumMsgs += sentMsgs;
        }
        Assert.assertEquals((int)sentNumMsgs, (int)400);
        Assert.assertEquals((int)numProducerExceptions, (int)0);
        int recvdNumBytes = 0;
        int numConsumerExceptions = 0;
        do {
            Thread.sleep(2000L);
            recvdNumMsgs = 0;
            for (int ix5 = 0; ix5 < 4; ++ix5) {
                int consNumMesgsRecvd = consThr[ix5].consumer.getNumMessagesRecvd();
                log.debug("consumer={} received {} messages (current total {}, expected {})", new Object[]{ix5, consNumMesgsRecvd, recvdNumMsgs += consNumMesgsRecvd, 400});
            }
        } while (recvdNumMsgs < 400);
        for (int ix6 = 0; ix6 < 4; ++ix6) {
            consThr[ix6].consumer.setAllMessagesReceived();
            log.debug("consumer={} told to stop", (Object)ix6);
        }
        for (boolean b : joinedConsumers = new boolean[4]) {
            b = false;
        }
        recvdNumBytes = 0;
        recvdNumMsgs = 0;
        int numConsumersDone = 0;
        while (numConsumersDone < 4) {
            for (ix = 0; ix < 4; ++ix) {
                if (joinedConsumers[ix]) continue;
                int recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
                int recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
                numConsumerExceptions += consThr[ix].consumer.getNumExceptions();
                log.debug("Consumer={} received {} mesgs and {} bytes", new Object[]{ix, recvdMsgs, recvdBytes});
                consThr[ix].thread.join();
                joinedConsumers[ix] = true;
                log.debug("Joined consumer={}", (Object)ix);
                recvdNumBytes += recvdBytes;
                recvdNumMsgs += recvdMsgs;
                ++numConsumersDone;
            }
        }
        for (ix = 0; ix < 4; ++ix) {
            consThr[ix].consumer.closeConsumer();
        }
        Assert.assertEquals((int)recvdNumMsgs, (int)400);
        Assert.assertEquals((int)numConsumerExceptions, (int)0);
        this.verfyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true);
        this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true);
        this.unRegisterTenantsAndNamespaces(topicStrings);
        this.destroyRGs();
    }

    private void verfyRGProdConsStats(String[] topicStrings, int sentNumBytes, int sentNumMsgs, int recvdNumBytes, int recvdNumMsgs, boolean checkProduce, boolean checkConsume) throws Exception {
        ResourceGroup.BytesAndMessagesCount consCounts;
        ResourceGroup.BytesAndMessagesCount prodCounts;
        boolean tenantRGEqualsNsRG = this.tenantRGEqualsNamespaceRG(topicStrings);
        BrokerService bs = this.pulsar.getBrokerService();
        Map topicStatsMap = bs.getTopicStats();
        log.debug("verfyProdConsStats: topicStatsMap has {} entries", (Object)topicStatsMap.size());
        int ExpectedNumBytesSent = sentNumBytes + 42 * sentNumMsgs;
        int ExpectedNumBytesReceived = recvdNumBytes + 42 * recvdNumMsgs;
        long totalOutMessages = 0L;
        long totalOutBytes = 0L;
        long totalInMessages = 0L;
        long totalInBytes = 0L;
        ResourceGroup.BytesAndMessagesCount totalTenantRGProdCounts = new ResourceGroup.BytesAndMessagesCount();
        ResourceGroup.BytesAndMessagesCount totalTenantRGConsCounts = new ResourceGroup.BytesAndMessagesCount();
        ResourceGroup.BytesAndMessagesCount totalNsRGProdCounts = new ResourceGroup.BytesAndMessagesCount();
        ResourceGroup.BytesAndMessagesCount totalNsRGConsCounts = new ResourceGroup.BytesAndMessagesCount();
        boolean ScaleFactor = true;
        HashSet<String> RGsWithPublishStatsGathered = new HashSet<String>();
        HashSet<String> RGsWithDispatchStatsGathered = new HashSet<String>();
        this.rgservice.aggregateResourceGroupLocalUsages();
        for (Map.Entry entry : topicStatsMap.entrySet()) {
            String nsRGName;
            String mapTopicName = (String)entry.getKey();
            if (!Arrays.asList(topicStrings).contains(mapTopicName)) continue;
            TopicStats stats = (TopicStats)entry.getValue();
            totalInBytes += stats.getBytesInCounter();
            totalOutMessages += stats.getMsgOutCounter();
            totalOutBytes += stats.getBytesOutCounter();
            if ((totalInMessages += stats.getMsgInCounter()) == 0L) {
                log.warn("verfyProdConsStats: found no produced mesgs (msgInCounter) on topic {}", (Object)mapTopicName);
            }
            if (sentNumMsgs <= 0 && recvdNumMsgs <= 0) continue;
            TopicName topic = TopicName.get((String)mapTopicName);
            String tenantRGName = this.TopicToTenantRGName(topic);
            if (!RGsWithPublishStatsGathered.contains(tenantRGName)) {
                prodCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroup.ResourceGroupMonitoringClass.Publish, this.getCumulativeUsageStats);
                totalTenantRGProdCounts = ResourceGroup.accumulateBMCount((ResourceGroup.BytesAndMessagesCount[])new ResourceGroup.BytesAndMessagesCount[]{totalTenantRGProdCounts, prodCounts});
                RGsWithPublishStatsGathered.add(tenantRGName);
            }
            if (!RGsWithDispatchStatsGathered.contains(tenantRGName)) {
                consCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroup.ResourceGroupMonitoringClass.Dispatch, this.getCumulativeUsageStats);
                totalTenantRGConsCounts = ResourceGroup.accumulateBMCount((ResourceGroup.BytesAndMessagesCount[])new ResourceGroup.BytesAndMessagesCount[]{totalTenantRGConsCounts, consCounts});
                RGsWithDispatchStatsGathered.add(tenantRGName);
            }
            if (tenantRGName.compareTo(nsRGName = this.TopicToNamespaceRGName(topic)) == 0) continue;
            if (!RGsWithPublishStatsGathered.contains(nsRGName)) {
                prodCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroup.ResourceGroupMonitoringClass.Publish, this.getCumulativeUsageStats);
                totalNsRGProdCounts = ResourceGroup.accumulateBMCount((ResourceGroup.BytesAndMessagesCount[])new ResourceGroup.BytesAndMessagesCount[]{totalNsRGProdCounts, prodCounts});
                RGsWithPublishStatsGathered.add(nsRGName);
            }
            if (RGsWithDispatchStatsGathered.contains(nsRGName)) continue;
            consCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroup.ResourceGroupMonitoringClass.Dispatch, this.getCumulativeUsageStats);
            totalNsRGConsCounts = ResourceGroup.accumulateBMCount((ResourceGroup.BytesAndMessagesCount[])new ResourceGroup.BytesAndMessagesCount[]{totalNsRGConsCounts, consCounts});
            RGsWithDispatchStatsGathered.add(nsRGName);
        }
        if (checkConsume && checkProduce) {
            Assert.assertEquals((long)totalOutMessages, (long)totalInMessages);
            Assert.assertEquals((long)totalOutBytes, (long)totalInBytes);
        }
        if (checkProduce) {
            Assert.assertEquals((long)totalInMessages, (long)sentNumMsgs);
            Assert.assertTrue((totalInBytes >= (long)ExpectedNumBytesSent ? 1 : 0) != 0);
        }
        if (checkConsume) {
            Assert.assertEquals((long)totalOutMessages, (long)recvdNumMsgs);
            Assert.assertTrue((totalOutBytes >= (long)ExpectedNumBytesReceived ? 1 : 0) != 0);
        }
        if (checkProduce) {
            prodCounts = ResourceGroup.accumulateBMCount((ResourceGroup.BytesAndMessagesCount[])new ResourceGroup.BytesAndMessagesCount[]{totalTenantRGProdCounts, totalNsRGProdCounts});
            Assert.assertEquals((long)prodCounts.messages, (long)sentNumMsgs);
            Assert.assertTrue((prodCounts.bytes >= (long)ExpectedNumBytesSent ? 1 : 0) != 0);
        }
        if (checkConsume) {
            consCounts = ResourceGroup.accumulateBMCount((ResourceGroup.BytesAndMessagesCount[])new ResourceGroup.BytesAndMessagesCount[]{totalTenantRGConsCounts, totalNsRGConsCounts});
            Assert.assertEquals((long)consCounts.messages, (long)recvdNumMsgs);
            Assert.assertTrue((consCounts.bytes >= (long)ExpectedNumBytesReceived ? 1 : 0) != 0);
        }
    }

    private void verifyRGMetrics(String[] topicStrings, int sentNumBytes, int sentNumMsgs, int recvdNumBytes, int recvdNumMsgs, boolean checkProduce, boolean checkConsume) throws Exception {
        boolean tenantRGEqualsNsRG = this.tenantRGEqualsNamespaceRG(topicStrings);
        int ExpectedNumBytesSent = sentNumBytes + 42 * sentNumMsgs;
        int ExpectedNumBytesReceived = recvdNumBytes + 42 * recvdNumMsgs;
        long totalTenantRegisters = 0L;
        long totalTenantUnRegisters = 0L;
        long totalNamespaceRegisters = 0L;
        long totalNamespaceUnRegisters = 0L;
        long[] totalQuotaBytes = new long[ResourceGroup.ResourceGroupMonitoringClass.values().length];
        long[] totalQuotaMessages = new long[ResourceGroup.ResourceGroupMonitoringClass.values().length];
        long[] totalUsedBytes = new long[ResourceGroup.ResourceGroupMonitoringClass.values().length];
        long[] totalUsedMessages = new long[ResourceGroup.ResourceGroupMonitoringClass.values().length];
        long[] totalUsageReportCounts = new long[ResourceGroup.ResourceGroupMonitoringClass.values().length];
        long totalUpdates = 0L;
        this.rgservice.aggregateResourceGroupLocalUsages();
        for (String string : this.RGNames) {
            for (ResourceGroup.ResourceGroupMonitoringClass mc : ResourceGroup.ResourceGroupMonitoringClass.values()) {
                String mcName = mc.name();
                int mcIndex = mc.ordinal();
                double quotaBytes = ResourceGroupService.getRgQuotaByteCount((String)string, (String)mcName);
                int n = mcIndex;
                totalQuotaBytes[n] = (long)((double)totalQuotaBytes[n] + quotaBytes);
                double quotaMesgs = ResourceGroupService.getRgQuotaMessageCount((String)string, (String)mcName);
                int n2 = mcIndex;
                totalQuotaMessages[n2] = (long)((double)totalQuotaMessages[n2] + quotaMesgs);
                double usedBytes = ResourceGroupService.getRgLocalUsageByteCount((String)string, (String)mcName);
                int n3 = mcIndex;
                totalUsedBytes[n3] = (long)((double)totalUsedBytes[n3] + usedBytes);
                double usedMesgs = ResourceGroupService.getRgLocalUsageMessageCount((String)string, (String)mcName);
                int n4 = mcIndex;
                totalUsedMessages[n4] = (long)((double)totalUsedMessages[n4] + usedMesgs);
                double usageReportedCount = ResourceGroup.getRgUsageReportedCount((String)string, (String)mcName);
                int n5 = mcIndex;
                totalUsageReportCounts[n5] = (long)((double)totalUsageReportCounts[n5] + usageReportedCount);
            }
            totalTenantRegisters = (long)((double)totalTenantRegisters + ResourceGroupService.getRgTenantRegistersCount((String)string));
            totalTenantUnRegisters = (long)((double)totalTenantUnRegisters + ResourceGroupService.getRgTenantUnRegistersCount((String)string));
            totalNamespaceRegisters = (long)((double)totalNamespaceRegisters + ResourceGroupService.getRgNamespaceRegistersCount((String)string));
            totalNamespaceUnRegisters = (long)((double)totalNamespaceUnRegisters + ResourceGroupService.getRgNamespaceUnRegistersCount((String)string));
            totalUpdates = (long)((double)totalUpdates + ResourceGroupService.getRgUpdatesCount((String)string));
        }
        log.info("totalTenantRegisters={}, totalTenantUnRegisters={}, totalNamespaceRegisters={}, totalNamespaceUnRegisters={}", new Object[]{totalTenantRegisters, totalTenantUnRegisters, totalNamespaceRegisters, totalNamespaceUnRegisters});
        Assert.assertEquals((long)(totalTenantRegisters - this.residualTenantRegs), (long)4L);
        Assert.assertEquals((long)(totalNamespaceRegisters - this.residualNamespaceRegs), (long)4L);
        Assert.assertEquals((long)totalTenantUnRegisters, (long)this.residualTenantRegs);
        Assert.assertEquals((long)totalNamespaceUnRegisters, (long)this.residualNamespaceRegs);
        this.residualTenantRegs = totalTenantRegisters;
        this.residualNamespaceRegs = totalNamespaceRegisters;
        for (String string : ResourceGroup.ResourceGroupMonitoringClass.values()) {
            int mcIdx = string.ordinal();
            log.info("mc={}: totalQuotaBytes={}, totalQuotaMessages={},  totalUsedBytes={}, totalUsedMessages={} totalUsageReports={}", new Object[]{string.name(), totalQuotaBytes[mcIdx], totalQuotaMessages[mcIdx], totalUsedBytes[mcIdx], totalUsedMessages[mcIdx], totalUsageReportCounts[mcIdx]});
            if (checkProduce && string == ResourceGroup.ResourceGroupMonitoringClass.Publish) {
                Assert.assertEquals((long)(totalUsedMessages[mcIdx] - this.residualSentNumMessages), (long)sentNumMsgs);
                Assert.assertTrue((totalUsedBytes[mcIdx] - this.residualSentNumBytes >= (long)ExpectedNumBytesSent ? 1 : 0) != 0);
            } else if (checkConsume && string == ResourceGroup.ResourceGroupMonitoringClass.Dispatch) {
                Assert.assertEquals((long)(totalUsedMessages[mcIdx] - this.residualRecvdNumMessages), (long)recvdNumMsgs);
                Assert.assertTrue((totalUsedBytes[mcIdx] - this.residualRecvdNumBytes >= (long)ExpectedNumBytesReceived ? 1 : 0) != 0);
            }
            long perClassUsageReports = this.numLocalUsageReports / (long)ResourceGroup.ResourceGroupMonitoringClass.values().length;
            Assert.assertEquals((long)totalUsageReportCounts[mcIdx], (long)perClassUsageReports);
        }
        this.residualSentNumBytes += (long)sentNumBytes;
        this.residualSentNumMessages += (long)sentNumMsgs;
        this.residualRecvdNumBytes += (long)recvdNumBytes;
        this.residualRecvdNumMessages += (long)recvdNumMsgs;
        Assert.assertEquals((long)totalUpdates, (long)0L);
        Summary.Child.Value usageAggrLatency = ResourceGroupService.getRgUsageAggregationLatency();
        Assert.assertNotEquals((Object)usageAggrLatency.count, (Object)0);
        Assert.assertNotEquals((Object)usageAggrLatency.sum, (Object)0);
        double fiftiethPercentileValue = (Double)usageAggrLatency.quantiles.get(0.5);
        Assert.assertNotEquals((Object)fiftiethPercentileValue, (Object)0);
        double d = (Double)usageAggrLatency.quantiles.get(0.9);
        Assert.assertNotEquals((Object)d, (Object)0);
        Summary.Child.Value quotaCalcLatency = ResourceGroupService.getRgQuotaCalculationTime();
        Assert.assertNotEquals((Object)quotaCalcLatency.count, (Object)0);
        Assert.assertNotEquals((Object)quotaCalcLatency.sum, (Object)0);
        fiftiethPercentileValue = (Double)quotaCalcLatency.quantiles.get(0.5);
        Assert.assertNotEquals((Object)fiftiethPercentileValue, (Object)0);
        d = (Double)quotaCalcLatency.quantiles.get(0.9);
        Assert.assertNotEquals((Object)d, (Object)0);
    }

    private void createTopics(String[] topics) {
        BrokerService bs = this.pulsar.getBrokerService();
        for (String topic : topics) {
            if (this.createdTopics.contains(topic)) continue;
            bs.getOrCreateTopic(topic);
            this.createdTopics.add(topic);
        }
    }

    private void destroyTopics(String[] topics) {
        BrokerService bs = this.pulsar.getBrokerService();
        for (String topic : topics) {
            if (this.createdTopics.contains(topic)) continue;
            bs.deleteTopic(topic, true);
            this.createdTopics.remove(topic);
        }
    }

    private void createRGs() throws Exception {
        for (String rgname : this.RGNames) {
            this.rgservice.resourceGroupCreate(rgname, this.rgConfig);
        }
    }

    private void destroyRGs() throws Exception {
        for (String rgname : this.RGNames) {
            this.rgservice.resourceGroupDelete(rgname);
        }
    }

    private void prepareForOps() throws PulsarAdminException {
        this.conf.setResourceUsageTransportPublishIntervalInSecs(10);
        this.conf.setAllowAutoTopicCreation(true);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
    }

    private void prepareRGs() throws Exception {
        int ix;
        Assert.assertTrue((boolean)true);
        Assert.assertEquals((int)0, (int)0);
        Assert.assertEquals((int)0, (int)0);
        int NumConsumerMessages = 400;
        int NumProducerMessages = 400;
        Assert.assertTrue((boolean)true);
        this.rgConfig.setPublishRateInBytes(1500L);
        this.rgConfig.setPublishRateInMsgs(100);
        this.rgConfig.setDispatchRateInBytes(4000L);
        this.rgConfig.setDispatchRateInMsgs(500);
        for (int ix2 = 0; ix2 < 4; ++ix2) {
            this.RGNames[ix2] = "rg-" + ix2;
        }
        TenantInfoImpl configInfo = new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"fakeAdminRole"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        for (ix = 0; ix < 4; ++ix) {
            this.admin.tenants().createTenant(this.RGNames[ix], (TenantInfo)configInfo);
        }
        for (ix = 0; ix < 4; ++ix) {
            this.TenantAndNsNameSameOrder[ix] = this.RGNames[ix] + "/" + this.RGNames[ix];
            this.TenantAndNsNameOppositeOrder[ix] = this.RGNames[ix] + "/" + this.RGNames[4 - (ix + 1)];
        }
        for (ix = 0; ix < 4; ++ix) {
            if (!this.createdNamespaces.contains(this.TenantAndNsNameSameOrder[ix])) {
                this.admin.namespaces().createNamespace(this.TenantAndNsNameSameOrder[ix]);
                this.admin.namespaces().setNamespaceReplicationClusters(this.TenantAndNsNameSameOrder[ix], (Set)Sets.newHashSet((Object[])new String[]{"test"}));
                this.createdNamespaces.add(this.TenantAndNsNameSameOrder[ix]);
            }
            if (this.createdNamespaces.contains(this.TenantAndNsNameOppositeOrder[ix])) continue;
            this.admin.namespaces().createNamespace(this.TenantAndNsNameOppositeOrder[ix]);
            this.admin.namespaces().setNamespaceReplicationClusters(this.TenantAndNsNameOppositeOrder[ix], (Set)Sets.newHashSet((Object[])new String[]{"test"}));
            this.createdNamespaces.add(this.TenantAndNsNameOppositeOrder[ix]);
        }
        for (ix = 0; ix < 8; ++ix) {
            this.TopicNamesSameTenantAndNsRGs[ix] = this.TenantAndNsNameSameOrder[ix % 4] + "/" + "rgusage-topic-" + ix;
            this.TopicNamesDifferentTenantAndNsRGs[ix] = this.TenantAndNsNameOppositeOrder[ix % 4] + "/" + "rgusage-topic-" + ix;
        }
        for (ix = 0; ix < 8; ++ix) {
            this.PersistentTopicNamesSameTenanatAndNsRGs[ix] = "persistent://" + this.TopicNamesSameTenantAndNsRGs[ix];
            this.PersistentTopicNamesDifferentTenantAndNsRGs[ix] = "persistent://" + this.TopicNamesDifferentTenantAndNsRGs[ix];
            this.NonPersistentTopicNamesSameTenantAndNsRGs[ix] = "non-persistent://" + this.TopicNamesSameTenantAndNsRGs[ix];
            this.NonPersistentTopicNamesDifferentTenantAndNsRGs[ix] = "non-persistent://" + this.TopicNamesDifferentTenantAndNsRGs[ix];
        }
    }

    class consumerWithThread {
        consumeMessages consumer;
        Thread thread;

        consumerWithThread() {
        }
    }

    private class consumeMessages
    implements Runnable {
        private int consumerId;
        private int numMesgsForThisConsumer;
        private int numTotalMesgsToConsume;
        private SubscriptionType subscriptionType;
        private String[] topicStrings;
        private Consumer<byte[]> consumer = null;
        private int recvTimeoutMilliSecs = 1000;
        private int ackTimeoutMilliSecs = 1100;
        private int recvdNumBytes = 0;
        private int recvdNumMsgs = 0;
        private int numExceptions = 0;
        private volatile boolean allMessagesReceived = false;
        private volatile boolean consumerIsReady = false;

        consumeMessages(int consId, int nMesgs, int totalMesgs, SubscriptionType subType, String[] topics) {
            this.consumerId = consId;
            this.numMesgsForThisConsumer = nMesgs;
            this.numTotalMesgsToConsume = totalMesgs;
            this.subscriptionType = subType;
            this.topicStrings = topics;
        }

        public boolean isConsumerReady() {
            return this.consumerIsReady;
        }

        public int getNumBytesRecvd() {
            return this.recvdNumBytes;
        }

        public int getNumMessagesRecvd() {
            return this.recvdNumMsgs;
        }

        public int getNumExceptions() {
            return this.numExceptions;
        }

        public void setAllMessagesReceived() {
            this.allMessagesReceived = true;
        }

        public void closeConsumer() {
            try {
                this.consumerIsReady = false;
                this.consumer.close();
            }
            catch (PulsarClientException p) {
                ++this.numExceptions;
                log.error("Consumer={} got exception while closing consumer: ex={}", (Object)this.consumerId, (Object)p.getMessage());
            }
        }

        @Override
        public void run() {
            int recvQueueSize = 0;
            String subscriptionString = null;
            switch (this.subscriptionType) {
                default: {
                    ++this.numExceptions;
                    String errMesg = String.format("Consumer=%d got unexpected subscription type=%s", this.consumerId, this.subscriptionType);
                    Assert.assertTrue((boolean)false, (String)errMesg);
                    break;
                }
                case Shared: {
                    recvQueueSize = this.numTotalMesgsToConsume;
                    subscriptionString = "my-subscription";
                    break;
                }
                case Exclusive: {
                    recvQueueSize = this.numMesgsForThisConsumer;
                    subscriptionString = "my-subscription-" + this.consumerId;
                }
            }
            try {
                this.consumer = RGUsageMTAggrWaitForAllMsgsTest.this.pulsarClient.newConsumer().topic(this.topicStrings).subscriptionName(subscriptionString).subscriptionType(this.subscriptionType).receiverQueueSize(recvQueueSize).ackTimeout((long)this.ackTimeoutMilliSecs, TimeUnit.MILLISECONDS).subscribe();
            }
            catch (PulsarClientException p) {
                ++this.numExceptions;
                log.error("Consumer={} got exception while building consumer: ex={}", (Object)this.consumerId, (Object)p.getMessage());
            }
            Message message = null;
            this.consumerIsReady = true;
            while (this.consumerIsReady && !this.allMessagesReceived) {
                log.debug("Consumer={} waiting for mesgnum={}", (Object)this.consumerId, (Object)this.recvdNumMsgs);
                try {
                    message = this.consumer.receive(this.recvTimeoutMilliSecs, TimeUnit.MILLISECONDS);
                    if (message == null) continue;
                    this.consumer.acknowledgeAsync(message);
                    String mesg = String.format("Consumer=%d recvd %d-th mesg; id=%s, data=%s", this.consumerId, this.recvdNumMsgs, message.getMessageId(), new String(message.getData()));
                    log.debug(mesg);
                    this.recvdNumBytes += ((byte[])message.getValue()).length;
                    ++this.recvdNumMsgs;
                }
                catch (PulsarClientException p) {
                    ++this.numExceptions;
                    log.error("Consumer={} got exception in while receiving {}-th mesg at consumer: ex={}", new Object[]{this.consumerId, this.recvdNumMsgs, p.getMessage()});
                }
            }
            log.debug("Consumer={} done; got {} exceptions", (Object)this.consumerId, (Object)this.numExceptions);
        }
    }

    class producerWithThread {
        produceMessages producer;
        Thread thread;

        producerWithThread() {
        }
    }

    private class produceMessages
    implements Runnable {
        private int producerId;
        private int numMesgsToProduce;
        private String[] topicStrings;
        private String myProduceTopic;
        private int sentNumBytes = 0;
        private int sentNumMsgs = 0;
        private int numExceptions = 0;

        produceMessages(int prodId, int nMesgs, String[] topics) {
            this.producerId = prodId;
            this.numMesgsToProduce = nMesgs;
            this.topicStrings = topics;
            this.myProduceTopic = this.topicStrings[this.producerId % 8];
        }

        public int getNumBytesSent() {
            return this.sentNumBytes;
        }

        public int getNumMessagesSent() {
            return this.sentNumMsgs;
        }

        public int getNumExceptions() {
            return this.numExceptions;
        }

        @Override
        public void run() {
            Producer producer = null;
            try {
                producer = RGUsageMTAggrWaitForAllMsgsTest.this.pulsarClient.newProducer().topic(this.myProduceTopic).create();
            }
            catch (PulsarClientException p) {
                ++this.numExceptions;
                log.info("Producer={} got exception while building producer: ex={}", (Object)this.producerId, (Object)p.getMessage());
            }
            for (int ix = 0; ix < this.numMesgsToProduce; ++ix) {
                try {
                    byte[] mesg = String.format("ProducerId=%d, ix=%d, topic=%s", this.producerId, ix, this.myProduceTopic).getBytes();
                    MessageId msgId = producer.send((Object)mesg);
                    this.sentNumBytes += mesg.length;
                    ++this.sentNumMsgs;
                    log.debug("Producer={}, sent msg-ix={}, msgId={}", new Object[]{this.producerId, ix, msgId});
                    continue;
                }
                catch (PulsarClientException p) {
                    ++this.numExceptions;
                    log.info("Producer={} got exception while sending {}-th time: ex={}", new Object[]{this.producerId, ix, p.getMessage()});
                }
            }
            try {
                producer.flush();
                producer.close();
            }
            catch (PulsarClientException p) {
                ++this.numExceptions;
                log.info("Producer={} got exception while closing producer: ex={}", (Object)this.producerId, (Object)p.getMessage());
            }
            log.debug("Producer={} done with topic={}; got {} exceptions", new Object[]{this.producerId, this.myProduceTopic, this.numExceptions});
        }
    }
}

