/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.stats;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class ConsumerStatsTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerStatsTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setMaxUnackedMessagesPerConsumer(0);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws PulsarClientException, InterruptedException, PulsarAdminException {
        Assert.assertEquals((int)this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(), (int)0);
        String topicName = "persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer"}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("sub").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer").create();
        int messages = 10;
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)("message-" + i).getBytes());
        }
        int received = 0;
        for (int i = 0; i < 10; ++i) {
            consumer.receive();
            ++received;
        }
        Assert.assertEquals((int)received, (int)10);
        received = 0;
        TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
        Assert.assertEquals((int)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().size(), (int)1);
        Assert.assertFalse((boolean)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).getUnackedMessages(), (int)10);
        for (int i = 0; i < 10; ++i) {
            consumer.acknowledge(consumer.receive());
            ++received;
        }
        Assert.assertEquals((int)received, (int)10);
        Thread.sleep(2000L);
        stats = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertFalse((boolean)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).getUnackedMessages(), (int)0);
    }

    @Test
    public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
        int i;
        String topic = "persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription", 3);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("sub").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription").create();
        int messages = 10;
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.send((Object)("message-" + i2).getBytes());
        }
        int received = 0;
        for (i = 0; i < 10; ++i) {
            consumer.acknowledge(consumer.receive());
            ++received;
        }
        Assert.assertEquals((int)10, (int)received);
        Thread.sleep(2000L);
        for (i = 0; i < 3; ++i) {
            TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription-partition-" + i);
            Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
            Assert.assertEquals((int)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().size(), (int)1);
            Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).getUnackedMessages(), (int)0);
        }
    }

    @Test
    public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription";
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        List consumers = ((PersistentSubscription)topicRef.getSubscriptions().get((Object)"my-subscription")).getConsumers();
        Assert.assertEquals((int)consumers.size(), (int)1);
        ConsumerStatsImpl consumerStats = new ConsumerStatsImpl();
        consumerStats.msgOutCounter = 10L;
        consumerStats.bytesOutCounter = 1280L;
        ((org.apache.pulsar.broker.service.Consumer)consumers.get(0)).updateStats(consumerStats);
        ConsumerStatsImpl updatedStats = ((org.apache.pulsar.broker.service.Consumer)consumers.get(0)).getStats();
        Assert.assertEquals((long)updatedStats.getMsgOutCounter(), (long)10L);
        Assert.assertEquals((long)updatedStats.getBytesOutCounter(), (long)1280L);
    }

    @Test
    public void testConsumerStatsOutput() throws Exception {
        HashSet allowedFields = Sets.newHashSet((Object[])new String[]{"msgRateOut", "msgThroughputOut", "bytesOutCounter", "messageAckRate", "msgOutCounter", "msgRateRedeliver", "chunkedMessageRate", "consumerName", "availablePermits", "unackedMessages", "avgMessagesPerEntry", "blockedConsumerOnUnackedMsgs", "readPositionWhenJoining", "lastAckedTimestamp", "lastConsumedTimestamp", "keyHashRanges", "metadata", "address", "connectedSince", "clientVersion"});
        String topicName = "persistent://prop/use/ns-abc/testConsumerStatsOutput";
        String subName = "my-subscription";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testConsumerStatsOutput"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/use/ns-abc/testConsumerStatsOutput");
        ObjectMapper mapper = ObjectMapperFactory.create();
        JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(((SubscriptionStats)stats.getSubscriptions().get("my-subscription")).getConsumers().get(0)));
        Iterator itr = node.fieldNames();
        while (itr.hasNext()) {
            String field = (String)itr.next();
            Assert.assertTrue((boolean)allowedFields.contains(field), (String)(field + " should not be exposed"));
        }
        consumer.close();
    }

    @Test
    public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {
        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
        this.testMessageAckRateMetric(topicName, true);
    }

    @Test
    public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception {
        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
        this.testMessageAckRateMetric(topicName, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics) throws Exception {
        int messages = 1000;
        String subName = "test_sub";
        CountDownLatch latch = new CountDownLatch(1000);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(true).batchingMaxMessages(10).create();
        try {
            MessageListener & Serializable listener = (MessageListener & Serializable)(consumer, msg) -> {
                try {
                    consumer.acknowledge(msg);
                    latch.countDown();
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            };
            Consumer c1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName(subName).subscriptionType(SubscriptionType.Shared).messageListener((MessageListener)listener).subscribe();
            try {
                Consumer c2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName(subName).subscriptionType(SubscriptionType.Shared).messageListener((MessageListener)listener).subscribe();
                try {
                    String namespace = TopicName.get((String)topicName).getNamespace();
                    for (int i = 0; i < 1000; ++i) {
                        producer.sendAsync((Object)UUID.randomUUID().toString());
                    }
                    producer.flush();
                    latch.await(20L, TimeUnit.SECONDS);
                    TimeUnit.SECONDS.sleep(1L);
                    Topic topic = (Topic)((Optional)this.pulsar.getBrokerService().getTopic(topicName, false).get()).get();
                    Subscription subscription = topic.getSubscription(subName);
                    List consumers = subscription.getConsumers();
                    Assert.assertEquals((int)consumers.size(), (int)2);
                    org.apache.pulsar.broker.service.Consumer consumer1 = (org.apache.pulsar.broker.service.Consumer)consumers.get(0);
                    org.apache.pulsar.broker.service.Consumer consumer2 = (org.apache.pulsar.broker.service.Consumer)consumers.get(1);
                    consumer1.updateRates();
                    consumer2.updateRates();
                    ByteArrayOutputStream output = new ByteArrayOutputStream();
                    PrometheusMetricsGenerator.generate((PulsarService)this.pulsar, (boolean)exposeTopicLevelMetrics, (boolean)true, (boolean)true, (OutputStream)output);
                    String metricStr = output.toString();
                    Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
                    Collection ackRateMetric = metricsMap.get((Object)"pulsar_consumer_msg_ack_rate");
                    Collection subAckRateMetrics = metricsMap.get((Object)"pulsar_subscription_msg_ack_rate");
                    String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
                    Collection rateOutMetric = metricsMap.get((Object)rateOutMetricName);
                    Assert.assertTrue((ackRateMetric.size() > 0 ? 1 : 0) != 0);
                    Assert.assertTrue((rateOutMetric.size() > 0 ? 1 : 0) != 0);
                    if (exposeTopicLevelMetrics) {
                        String consumer1Name = consumer1.consumerName();
                        String consumer2Name = consumer2.consumerName();
                        double totalAckRate = ackRateMetric.stream().filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name) || metric.tags.get("consumer_name").equals(consumer2Name)).mapToDouble(metric -> metric.value).sum();
                        double totalRateOut = rateOutMetric.stream().filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name) || metric.tags.get("consumer_name").equals(consumer2Name)).mapToDouble(metric -> metric.value).sum();
                        double subAckRate = subAckRateMetrics.stream().filter(m -> m.tags.get("subscription").equals(subName)).mapToDouble(m -> m.value).sum();
                        Assert.assertEquals((int)subAckRateMetrics.size(), (int)1);
                        Assert.assertTrue((totalAckRate > 0.0 ? 1 : 0) != 0);
                        Assert.assertTrue((totalRateOut > 0.0 ? 1 : 0) != 0);
                        Assert.assertEquals((double)totalAckRate, (double)subAckRate, (double)(0.1 * totalAckRate));
                        Assert.assertEquals((double)totalAckRate, (double)totalRateOut, (double)(totalRateOut * 0.1));
                    } else {
                        double totalAckRate = ackRateMetric.stream().filter(metric -> namespace.equals(metric.tags.get("namespace"))).mapToDouble(metric -> metric.value).sum();
                        double totalRateOut = rateOutMetric.stream().filter(metric -> namespace.equals(metric.tags.get("namespace"))).mapToDouble(metric -> metric.value).sum();
                        Assert.assertTrue((totalAckRate > 0.0 ? 1 : 0) != 0);
                        Assert.assertTrue((totalRateOut > 0.0 ? 1 : 0) != 0);
                        Assert.assertEquals((double)totalAckRate, (double)totalRateOut, (double)(totalRateOut * 0.1));
                    }
                }
                finally {
                    if (Collections.singletonList(c2).get(0) != null) {
                        c2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(c1).get(0) != null) {
                    c1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

