/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageContainerImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class ProducerSemaphoreTest
extends ProducerConsumerBase {
    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testProducerSemaphoreInvalidMessage() throws Exception {
        int pendingQueueSize = 100;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("testProducerSemaphoreAcquire").maxPendingMessages(100).enableBatching(true).create();
        try {
            this.stopBroker();
            Field maxMessageSizeFiled = ClientCnx.class.getDeclaredField("maxMessageSize");
            maxMessageSizeFiled.setAccessible(true);
            maxMessageSizeFiled.set(null, 2);
            try {
                producer.send((Object)"semaphore-test".getBytes(StandardCharsets.UTF_8));
                Assert.fail((String)"can not reach here");
            }
            catch (PulsarClientException.InvalidMessageException ex) {
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)100);
            }
            producer.conf.setBatchingEnabled(false);
            try {
                producer.send((Object)"semaphore-test".getBytes(StandardCharsets.UTF_8));
                Assert.fail((String)"can not reach here");
            }
            catch (PulsarClientException.InvalidMessageException ex) {
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)100);
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientException, ExecutionException, InterruptedException {
        int pendingQueueSize = 100;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("testProducerSemaphoreAcquire").maxPendingMessages(100).enableBatching(false).create();
        try {
            int i;
            int messages = 10;
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(10);
            producer.getClientCnx().channel().config().setAutoRead(false);
            try {
                for (i = 0; i < 10; ++i) {
                    futures.add(producer.newMessage().value((Object)("Semaphore-test-" + i).getBytes()).sendAsync());
                }
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)90);
                Assert.assertFalse((boolean)producer.isErrorStat());
            }
            finally {
                producer.getClientCnx().channel().config().setAutoRead(true);
            }
            FutureUtil.waitForAll(futures).get();
            Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)100);
            Assert.assertFalse((boolean)producer.isErrorStat());
            futures.clear();
            producer.getClientCnx().channel().config().setAutoRead(false);
            try {
                for (i = 0; i < 5; ++i) {
                    MessageMetadata metadata = new MessageMetadata().setNumMessagesInBatch(10);
                    MessageImpl msg = MessageImpl.create((MessageMetadata)metadata, (ByteBuffer)ByteBuffer.wrap(new byte[0]), (Schema)Schema.BYTES, null);
                    futures.add(producer.sendAsync((Message)msg));
                }
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)95);
                Assert.assertFalse((boolean)producer.isErrorStat());
            }
            finally {
                producer.getClientCnx().channel().config().setAutoRead(true);
            }
            FutureUtil.waitForAll(futures).get();
            Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)100);
            Assert.assertFalse((boolean)producer.isErrorStat());
            futures.clear();
            Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)100);
            Assert.assertFalse((boolean)producer.isErrorStat());
            producer.getClientCnx().channel().config().setAutoRead(false);
            try {
                for (i = 0; i < 5; ++i) {
                    futures.add(producer.newMessage().value((Object)("Semaphore-test-" + i).getBytes()).sendAsync());
                }
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)95);
                Assert.assertFalse((boolean)producer.isErrorStat());
            }
            finally {
                producer.getClientCnx().channel().config().setAutoRead(true);
            }
            FutureUtil.waitForAll(futures).get();
            Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)100);
            Assert.assertFalse((boolean)producer.isErrorStat());
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testEnsureNotBlockOnThePendingQueue() throws Exception {
        int pendingQueueSize = 10;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("testProducerSemaphoreAcquire").maxPendingMessages(10).enableBatching(false).create();
        try {
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)10);
            Assert.assertFalse((boolean)producer.isErrorStat());
            producer.getClientCnx().channel().config().setAutoRead(false);
            try {
                for (int i = 0; i < 10; ++i) {
                    MessageMetadata metadata = new MessageMetadata().setNumMessagesInBatch(10);
                    MessageImpl msg = MessageImpl.create((MessageMetadata)metadata, (ByteBuffer)ByteBuffer.wrap(new byte[0]), (Schema)Schema.BYTES, null);
                    futures.add(producer.sendAsync((Message)msg));
                }
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)0);
                Assert.assertFalse((boolean)producer.isErrorStat());
                try {
                    MessageMetadata metadata = new MessageMetadata().setNumMessagesInBatch(10);
                    MessageImpl msg = MessageImpl.create((MessageMetadata)metadata, (ByteBuffer)ByteBuffer.wrap(new byte[0]), (Schema)Schema.BYTES, null);
                    producer.sendAsync((Message)msg).get();
                    Assert.fail((String)"Shouldn't be able to send message");
                }
                catch (ExecutionException ee) {
                    Assert.assertEquals(ee.getCause().getClass(), PulsarClientException.ProducerQueueIsFullError.class);
                    Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)0);
                    Assert.assertFalse((boolean)producer.isErrorStat());
                }
            }
            finally {
                producer.getClientCnx().channel().config().setAutoRead(true);
            }
            FutureUtil.waitForAll(futures).get();
            futures.clear();
            Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)10);
            Assert.assertFalse((boolean)producer.isErrorStat());
            producer.getClientCnx().channel().config().setAutoRead(false);
            try {
                for (int i = 0; i < 10; ++i) {
                    futures.add(producer.newMessage().value((Object)("Semaphore-test-" + i).getBytes()).sendAsync());
                }
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)0);
                Assert.assertFalse((boolean)producer.isErrorStat());
                try {
                    producer.newMessage().value((Object)"Semaphore-test-Q-full".getBytes()).sendAsync().get();
                }
                catch (ExecutionException ee) {
                    Assert.assertEquals(ee.getCause().getClass(), PulsarClientException.ProducerQueueIsFullError.class);
                    Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)0);
                    Assert.assertFalse((boolean)producer.isErrorStat());
                }
            }
            finally {
                producer.getClientCnx().channel().config().setAutoRead(true);
            }
            FutureUtil.waitForAll(futures).get();
            Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)10);
            Assert.assertFalse((boolean)producer.isErrorStat());
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception {
        int pendingQueueSize = 10;
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("testProducerSemaphoreRelease").sendTimeout(2, TimeUnit.SECONDS).maxPendingMessages(10).enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxBytes(15).create();
        try {
            this.stopBroker();
            try {
                ProducerImpl spyProducer = (ProducerImpl)Mockito.spy((Object)producer);
                spyProducer.newMessage().value((Object)"semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
                spyProducer.newMessage().value((Object)"semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
                Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
                batchMessageContainerField.setAccessible(true);
                BatchMessageContainerImpl batchMessageContainer = (BatchMessageContainerImpl)batchMessageContainerField.get(spyProducer);
                batchMessageContainer.setProducer(spyProducer);
                ((ProducerImpl)Mockito.doThrow((Throwable[])new Throwable[]{new PulsarClientException.CryptoException("crypto error")}).when((Object)spyProducer)).encryptMessage((MessageMetadata)ArgumentMatchers.any(), (ByteBuf)ArgumentMatchers.any());
                try {
                    spyProducer.newMessage().value((Object)"memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
                }
                catch (Exception e) {
                    throw PulsarClientException.unwrap((Throwable)e);
                }
                throw new IllegalStateException("can not reach here");
            }
            catch (PulsarClientException.TimeoutException ex) {
                Assert.assertEquals((int)((Semaphore)producer.getSemaphore().get()).availablePermits(), (int)10);
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        catch (Throwable throwable) {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
            throw throwable;
        }
    }
}

