/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TransactionBufferClientTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionBufferClientTest.class);
    private TransactionBufferClient tbClient;
    TopicName partitionedTopicName = TopicName.get((String)"persistent", (String)"public", (String)"test", (String)"tb-client");
    int partitions = 10;
    private static final String namespace = "public/test";

    @BeforeClass(alwaysRun=true)
    protected void setup() throws Exception {
        this.setBrokerCount(3);
        this.internalSetup();
        String[] brokerServiceUrlArr = this.getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
        this.admin.tenants().createTenant("public", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(namespace, 10);
        this.admin.topics().createPartitionedTopic(this.partitionedTopicName.getPartitionedTopicName(), this.partitions);
        this.tbClient = TransactionBufferClientImpl.create((PulsarService)((PulsarService)this.pulsarServiceList.get(0)), (HashedWheelTimer)new HashedWheelTimer((ThreadFactory)new DefaultThreadFactory("transaction-buffer")), (int)1000, (long)3000L);
    }

    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        this.tbClient.close();
        super.internalCleanup();
    }

    @Test
    public void testCommitOnTopic() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.commitTxnOnTopic(topic, 1L, (long)i, Long.MIN_VALUE));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    @Test
    public void testAbortOnTopic() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.abortTxnOnTopic(topic, 1L, (long)i, Long.MIN_VALUE));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    @Test
    public void testCommitOnSubscription() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.commitTxnOnSubscription(topic, "test", 1L, (long)i, -1L));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    @Test
    public void testAbortOnSubscription() throws ExecutionException, InterruptedException {
        int i;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (i = 0; i < this.partitions; ++i) {
            String topic = this.partitionedTopicName.getPartition(i).toString();
            futures.add(this.tbClient.abortTxnOnSubscription(topic, "test", 1L, (long)i, -1L));
        }
        for (i = 0; i < futures.size(); ++i) {
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getMostSigBits(), (long)1L);
            Assert.assertEquals((long)((TxnID)((CompletableFuture)futures.get(i)).get()).getLeastSigBits(), (long)i);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionBufferClientTimeout() throws Exception {
        PulsarService pulsarService = (PulsarService)this.pulsarServiceList.get(0);
        final PulsarClient mockClient = (PulsarClient)Mockito.mock(PulsarClientImpl.class);
        CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<ClientCnx>();
        ClientCnx clientCnx = (ClientCnx)Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when((Object)((PulsarClientImpl)mockClient).getConnection(ArgumentMatchers.anyString())).thenReturn(completableFuture);
        ChannelHandlerContext cnx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Mockito.when((Object)clientCnx.ctx()).thenReturn((Object)cnx);
        Channel channel = (Channel)Mockito.mock(Channel.class);
        Mockito.when((Object)cnx.channel()).thenReturn((Object)channel);
        Mockito.when((Object)pulsarService.getClient()).thenAnswer((Answer)new Answer<PulsarClient>(){

            public PulsarClient answer(InvocationOnMock invocation) throws Throwable {
                return mockClient;
            }
        });
        Mockito.when((Object)channel.isActive()).thenReturn((Object)true);
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandler = new TransactionBufferHandlerImpl(pulsarService, hashedWheelTimer, 1000, 3000L);
            CompletableFuture endFuture = transactionBufferHandler.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L);
            Field field = TransactionBufferHandlerImpl.class.getDeclaredField("outstandingRequests");
            field.setAccessible(true);
            ConcurrentSkipListMap outstandingRequests = (ConcurrentSkipListMap)field.get(transactionBufferHandler);
            Assert.assertEquals((int)outstandingRequests.size(), (int)1);
            Awaitility.await().atLeast(2L, TimeUnit.SECONDS).until(() -> {
                if (outstandingRequests.size() == 0) {
                    return true;
                }
                return false;
            });
            try {
                endFuture.get();
                Assert.fail();
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException));
            }
        }
        finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionBufferChannelUnActive() throws PulsarServerException {
        PulsarService pulsarService = (PulsarService)this.pulsarServiceList.get(0);
        final PulsarClient mockClient = (PulsarClient)Mockito.mock(PulsarClientImpl.class);
        CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<ClientCnx>();
        ClientCnx clientCnx = (ClientCnx)Mockito.mock(ClientCnx.class);
        completableFuture.complete(clientCnx);
        Mockito.when((Object)((PulsarClientImpl)mockClient).getConnection(ArgumentMatchers.anyString())).thenReturn(completableFuture);
        ChannelHandlerContext cnx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Mockito.when((Object)clientCnx.ctx()).thenReturn((Object)cnx);
        Channel channel = (Channel)Mockito.mock(Channel.class);
        Mockito.when((Object)cnx.channel()).thenReturn((Object)channel);
        Mockito.when((Object)channel.isActive()).thenReturn((Object)false);
        Mockito.when((Object)pulsarService.getClient()).thenAnswer((Answer)new Answer<PulsarClient>(){

            public PulsarClient answer(InvocationOnMock invocation) throws Throwable {
                return mockClient;
            }
        });
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        try {
            TransactionBufferHandlerImpl transactionBufferHandler = new TransactionBufferHandlerImpl((PulsarService)this.pulsarServiceList.get(0), hashedWheelTimer, 1000, 3000L);
            try {
                transactionBufferHandler.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.LookupException));
            }
            Mockito.when((Object)channel.isActive()).thenReturn((Object)true);
            try {
                transactionBufferHandler.endTxnOnTopic("test", 1L, 1L, TxnAction.ABORT, 1L).get();
                Assert.fail();
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException));
            }
        }
        finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }

    @Test
    public void testTransactionBufferLookUp() throws Exception {
        String topic = "persistent://public/test/testTransactionBufferLookUp";
        String subName = "test";
        String abortTopic = topic + "_abort_sub";
        String commitTopic = topic + "_commit_sub";
        this.admin.topics().createNonPartitionedTopic(abortTopic);
        this.admin.topics().createSubscription(abortTopic, subName, MessageId.earliest);
        this.admin.topics().createNonPartitionedTopic(commitTopic);
        this.admin.topics().createSubscription(commitTopic, subName, MessageId.earliest);
        this.tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get();
    }

    @Test
    public void testTransactionBufferRequestCredits() throws Exception {
        String topic = "persistent://public/test/testTransactionBufferRequestCredits";
        String subName = "test";
        String abortTopic = topic + "_abort_sub";
        String commitTopic = topic + "_commit_sub";
        this.admin.topics().createNonPartitionedTopic(abortTopic);
        this.admin.topics().createSubscription(abortTopic, subName, MessageId.earliest);
        this.admin.topics().createNonPartitionedTopic(commitTopic);
        this.admin.topics().createSubscription(commitTopic, subName, MessageId.earliest);
        this.tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get();
        Assert.assertEquals((int)this.tbClient.getAvailableRequestCredits(), (int)1000);
    }

    @Test
    public void testTransactionBufferPendingRequests() throws Exception {
    }

    @Test
    public void testEndTopicNotExist() throws Exception {
        String topic = "persistent://public/test/testEndTopicNotExist";
        String sub = "test";
        this.tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
        this.tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, -1L).get();
    }

    @Test
    public void testEndSubNotExist() throws Exception {
        String topic = "persistent://public/test/testEndTopicNotExist";
        String sub = "test";
        this.admin.topics().createNonPartitionedTopic(topic + "_abort_sub");
        this.admin.topics().createNonPartitionedTopic(topic + "_commit_sub");
        this.tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, -1L).get();
        this.tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, -1L).get();
    }
}

