/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class ProducerCloseTest
extends ProducerConsumerBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testProducerCloseCallback() throws Exception {
        this.initClient();
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("testProducerClose").sendTimeout(5, TimeUnit.SECONDS).maxPendingMessages(0).enableBatching(false).create();
        try {
            TypedMessageBuilder messageBuilder = producer.newMessage();
            TypedMessageBuilder value = messageBuilder.value((Object)"test-msg".getBytes(StandardCharsets.UTF_8));
            producer.getClientCnx().channel().config().setAutoRead(false);
            CompletableFuture completableFuture = value.sendAsync();
            producer.closeAsync();
            CommandSuccess commandSuccess = new CommandSuccess();
            PulsarClientImpl clientImpl = (PulsarClientImpl)this.pulsarClient;
            commandSuccess.setRequestId(clientImpl.newRequestId() - 1L);
            producer.getClientCnx().handleSuccess(commandSuccess);
            Thread.sleep(3000L);
            Assert.assertEquals((boolean)completableFuture.isDone(), (boolean)true);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws Exception {
        this.initClient();
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("testProducerClose").maxPendingMessages(10).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).batchingMaxBytes(Integer.MAX_VALUE).enableBatching(true).create();
        try {
            CompletableFuture completableFuture = producer.newMessage().value((Object)"test-msg".getBytes(StandardCharsets.UTF_8)).sendAsync();
            producer.setState(HandlerState.State.Failed);
            producer.closeAsync();
            Assert.assertTrue((boolean)completableFuture.isCompletedExceptionally());
            try {
                completableFuture.get();
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.AlreadyClosedException));
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private void initClient() throws PulsarClientException {
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).build();
    }
}

