/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.processor;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.processor.CustomBatchFormat;
import org.apache.pulsar.client.processor.CustomBatchPayloadProcessor;
import org.apache.pulsar.client.processor.CustomBatchProducer;
import org.apache.pulsar.client.processor.DefaultProcessorWithRefCnt;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class MessagePayloadProcessorTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessagePayloadProcessorTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("public", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("public/default", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }

    @Override
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider
    public static Object[][] config() {
        return new Object[][]{{1, true, 1}, {1, true, 4}, {1, false, 1}, {3, false, 1}};
    }

    @DataProvider
    public static Object[][] customBatchConfig() {
        return new Object[][]{{10, 1}, {10, 4}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="config")
    public void testDefaultProcessor(int numPartitions, boolean enableBatching, int batchingMaxMessages) throws Exception {
        String topic = "testDefaultProcessor-" + numPartitions + "-" + enableBatching + "-" + batchingMaxMessages;
        int numMessages = 10;
        String messagePrefix = "msg-";
        if (numPartitions > 1) {
            this.admin.topics().createPartitionedTopic(topic, numPartitions);
        }
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(enableBatching).batchingMaxMessages(batchingMaxMessages).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).messageRouter(new MessageRouter(){
            int i = 0;

            public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                return this.i++ % metadata.numPartitions();
            }
        }).create();
        try {
            for (int i = 0; i < 10; ++i) {
                String value = "msg-" + i;
                producer.sendAsync((Object)value).whenComplete((id, e) -> {
                    if (e == null) {
                        log.info("Send {} to {} {}", new Object[]{value, topic, id});
                    } else {
                        log.error("Failed to send {}: {}", (Object)value, (Object)e.getMessage());
                    }
                });
            }
            DefaultProcessorWithRefCnt processor = new DefaultProcessorWithRefCnt();
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).messagePayloadProcessor((MessagePayloadProcessor)processor).subscribe();
            try {
                int i;
                ArrayList<Object> values = new ArrayList<Object>();
                for (i = 0; i < 10; ++i) {
                    Message message = consumer.receive(1, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)message);
                    values.add(message.getValue());
                    consumer.acknowledge(message.getMessageId());
                }
                if (numPartitions > 1) {
                    Collections.sort(values);
                }
                for (i = 0; i < 10; ++i) {
                    Assert.assertEquals((String)((String)values.get(i)), (String)("msg-" + i));
                }
                if (enableBatching) {
                    int numBatches = 10 / batchingMaxMessages;
                    Assert.assertEquals((int)processor.getTotalRefCnt(), (int)(2 * (numBatches += 10 % batchingMaxMessages == 0 ? 0 : 1)));
                } else {
                    Assert.assertEquals((int)processor.getTotalRefCnt(), (int)20);
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void testCustomBatchFormat() {
        ArrayList<List<Object>> inputs = new ArrayList<List<Object>>();
        inputs.add(Collections.emptyList());
        inputs.add(Collections.singletonList("java"));
        inputs.add(Arrays.asList("hello", "world", "java"));
        for (List list : inputs) {
            ByteBuf buf = CustomBatchFormat.serialize(list);
            CustomBatchFormat.Metadata metadata = CustomBatchFormat.readMetadata(buf);
            ArrayList<Object> parsedTokens = new ArrayList<Object>();
            for (int i = 0; i < metadata.getNumMessages(); ++i) {
                parsedTokens.add(Schema.STRING.decode(CustomBatchFormat.readMessage(buf)));
            }
            Assert.assertEquals(parsedTokens, (Collection)list);
            Assert.assertEquals((int)parsedTokens.size(), (int)list.size());
            Assert.assertEquals((int)buf.refCnt(), (int)1);
            buf.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="customBatchConfig")
    public void testCustomProcessor(int numMessages, int batchingMaxMessages) throws Exception {
        String topic = "persistent://public/default/testCustomProcessor-" + numMessages + "-" + batchingMaxMessages;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).messagePayloadProcessor((MessagePayloadProcessor)new CustomBatchPayloadProcessor()).subscribe();
        try {
            int i;
            PersistentTopic persistentTopic = ((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).orElse(null);
            Assert.assertNotNull((Object)persistentTopic);
            String messagePrefix = "msg-";
            CustomBatchProducer producer = new CustomBatchProducer(persistentTopic, batchingMaxMessages);
            for (i = 0; i < numMessages; ++i) {
                producer.sendAsync("msg-" + i);
            }
            producer.flush();
            for (i = 0; i < numMessages; ++i) {
                Message message = consumer.receive(1, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)message);
                Assert.assertEquals((String)((String)message.getValue()), (String)("msg-" + i));
                consumer.acknowledge(message.getMessageId());
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }
}

