/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class PartialPartitionedProducerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(PartialPartitionedProducerTest.class);

    @Override
    @BeforeClass
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPtWithSinglePartition() throws Throwable {
        String topic = BrokerTestUtil.newUniqueName("pt-with-single-routing");
        this.admin.topics().createPartitionedTopic(topic, 10);
        PartitionedProducerImpl producerImpl = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            for (int i = 0; i < 10; ++i) {
                producerImpl.newMessage().value((Object)"msg".getBytes()).send();
            }
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)1);
        }
        finally {
            if (Collections.singletonList(producerImpl).get(0) != null) {
                producerImpl.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPtWithPartialPartition() throws Throwable {
        String topic = BrokerTestUtil.newUniqueName("pt-with-partial-routing");
        this.admin.topics().createPartitionedTopic(topic, 10);
        PartitionedProducerImpl producerImpl = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter((MessageRouter)new PartialRoundRobinMessageRouterImpl(3)).create();
        try {
            for (int i = 0; i < 10; ++i) {
                producerImpl.newMessage().value((Object)"msg".getBytes()).send();
            }
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)3);
        }
        finally {
            if (Collections.singletonList(producerImpl).get(0) != null) {
                producerImpl.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPtLazyLoading() throws Throwable {
        String topic = BrokerTestUtil.newUniqueName("pt-lazily");
        this.admin.topics().createPartitionedTopic(topic, 10);
        PartitionedProducerImpl producerImpl = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        try {
            Supplier<Boolean> send = () -> {
                for (int i = 0; i < 10; ++i) {
                    try {
                        producerImpl.newMessage().value((Object)"msg".getBytes()).send();
                        continue;
                    }
                    catch (Throwable e) {
                        return false;
                    }
                }
                return true;
            };
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)1);
            Assert.assertTrue((boolean)send.get());
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)10);
        }
        finally {
            if (Collections.singletonList(producerImpl).get(0) != null) {
                producerImpl.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPtLoadingNotSharedMode() throws Throwable {
        String topic = BrokerTestUtil.newUniqueName("pt-not-shared-mode");
        this.admin.topics().createPartitionedTopic(topic, 10);
        PartitionedProducerImpl producerImplExclusive = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).accessMode(ProducerAccessMode.Exclusive).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        try {
            Assert.assertEquals((int)producerImplExclusive.getProducers().size(), (int)10);
            producerImplExclusive.close();
            PartitionedProducerImpl producerImplWaitForExclusive = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).accessMode(ProducerAccessMode.WaitForExclusive).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
            try {
                Assert.assertEquals((int)producerImplWaitForExclusive.getProducers().size(), (int)10);
            }
            finally {
                if (Collections.singletonList(producerImplWaitForExclusive).get(0) != null) {
                    producerImplWaitForExclusive.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producerImplExclusive).get(0) != null) {
                producerImplExclusive.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPtUpdateWithPartialPartition() throws Throwable {
        String topic = BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
        this.admin.topics().createPartitionedTopic(topic, 2);
        Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
        field.setAccessible(true);
        PartitionedProducerImpl producerImpl = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter((MessageRouter)new PartialRoundRobinMessageRouterImpl(3)).accessMode(ProducerAccessMode.Shared).autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).create();
        try {
            Supplier<Boolean> send = () -> {
                for (int i = 0; i < 10; ++i) {
                    try {
                        producerImpl.newMessage().value((Object)"msg".getBytes()).send();
                        continue;
                    }
                    catch (Throwable e) {
                        return false;
                    }
                }
                return true;
            };
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)1);
            Assert.assertTrue((boolean)send.get());
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)2);
            this.admin.topics().updatePartitionedTopic(topic, 3);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((TopicMetadata)field.get(producerImpl)).numPartitions(), (int)3));
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)2);
            Assert.assertTrue((boolean)send.get());
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)3);
            this.admin.topics().updatePartitionedTopic(topic, 4);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((TopicMetadata)field.get(producerImpl)).numPartitions(), (int)4));
            Assert.assertTrue((boolean)send.get());
            Assert.assertEquals((int)producerImpl.getProducers().size(), (int)3);
        }
        finally {
            if (Collections.singletonList(producerImpl).get(0) != null) {
                producerImpl.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPtUpdateNotSharedMode() throws Throwable {
        String topic = BrokerTestUtil.newUniqueName("pt-update-not-shared");
        this.admin.topics().createPartitionedTopic(topic, 2);
        Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
        field.setAccessible(true);
        PartitionedProducerImpl producerImplExclusive = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).accessMode(ProducerAccessMode.Exclusive).autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).create();
        try {
            Assert.assertEquals((int)producerImplExclusive.getProducers().size(), (int)2);
            this.admin.topics().updatePartitionedTopic(topic, 3);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((TopicMetadata)field.get(producerImplExclusive)).numPartitions(), (int)3));
            Assert.assertEquals((int)producerImplExclusive.getProducers().size(), (int)3);
            producerImplExclusive.close();
            PartitionedProducerImpl producerImplWaitForExclusive = (PartitionedProducerImpl)this.pulsarClient.newProducer().topic(topic).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).accessMode(ProducerAccessMode.WaitForExclusive).autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).create();
            try {
                Assert.assertEquals((int)producerImplWaitForExclusive.getProducers().size(), (int)3);
                this.admin.topics().updatePartitionedTopic(topic, 4);
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((TopicMetadata)field.get(producerImplWaitForExclusive)).numPartitions(), (int)4));
                Assert.assertEquals((int)producerImplWaitForExclusive.getProducers().size(), (int)4);
            }
            finally {
                if (Collections.singletonList(producerImplWaitForExclusive).get(0) != null) {
                    producerImplWaitForExclusive.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producerImplExclusive).get(0) != null) {
                producerImplExclusive.close();
            }
        }
    }
}

