/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TransactionMetadataStoreServiceTest
extends BrokerTestBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        ServiceConfiguration configuration = TransactionMetadataStoreServiceTest.getDefaultConf();
        configuration.setTransactionCoordinatorEnabled(true);
        super.baseSetup(configuration);
        this.admin.tenants().createTenant("pulsar", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
        this.admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testAddAndRemoveTransactionMetadataStore() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        Assert.assertNotNull((Object)transactionMetadataStoreService);
        this.admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> transactionMetadataStoreService.getStores().size() == 1);
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> transactionMetadataStoreService.getStores().size() == 0);
    }

    @Test
    public void testNewTransaction() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get((long)1L));
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get((long)2L));
        Awaitility.await().until(() -> transactionMetadataStoreService.getStores().size() == 3);
        this.checkTransactionMetadataStoreReady((MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L)));
        this.checkTransactionMetadataStoreReady((MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)1L)));
        this.checkTransactionMetadataStoreReady((MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)2L)));
        TxnID txnID0 = (TxnID)transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get((long)0L), 5L).get();
        TxnID txnID1 = (TxnID)transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get((long)1L), 5L).get();
        TxnID txnID2 = (TxnID)transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get((long)2L), 5L).get();
        Assert.assertEquals((long)txnID0.getMostSigBits(), (long)0L);
        Assert.assertEquals((long)txnID1.getMostSigBits(), (long)1L);
        Assert.assertEquals((long)txnID2.getMostSigBits(), (long)2L);
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)0L));
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)1L));
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)2L));
        Assert.assertEquals((int)transactionMetadataStoreService.getStores().size(), (int)0);
    }

    @Test
    public void testAddProducedPartitionToTxn() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> transactionMetadataStoreService.getStores().size() == 1);
        MLTransactionMetadataStore transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        TxnID txnID = (TxnID)transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get((long)0L), 5000L).get();
        ArrayList<String> partitions = new ArrayList<String>();
        partitions.add("ptn-0");
        partitions.add("ptn-1");
        partitions.add("ptn-2");
        transactionMetadataStoreService.addProducedPartitionToTxn(txnID, partitions);
        TxnMeta txn = (TxnMeta)transactionMetadataStoreService.getTxnMeta(txnID).get();
        Assert.assertEquals((Object)txn.status(), (Object)TxnStatus.OPEN);
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)0L));
        Assert.assertEquals((int)transactionMetadataStoreService.getStores().size(), (int)0);
    }

    @Test
    public void testAddAckedPartitionToTxn() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get((long)0L)).get();
        Awaitility.await().until(() -> transactionMetadataStoreService.getStores().size() == 1);
        MLTransactionMetadataStore transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        TxnID txnID = (TxnID)transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get((long)0L), 5000L).get();
        ArrayList<TransactionSubscription> partitions = new ArrayList<TransactionSubscription>();
        partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
        partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
        partitions.add(TransactionSubscription.builder().topic("ptn-3").subscription("sub-1").build());
        transactionMetadataStoreService.addAckedPartitionToTxn(txnID, partitions);
        TxnMeta txn = (TxnMeta)transactionMetadataStoreService.getTxnMeta(txnID).get();
        Assert.assertEquals((Object)txn.status(), (Object)TxnStatus.OPEN);
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)0L));
        Assert.assertEquals((int)transactionMetadataStoreService.getStores().size(), (int)0);
    }

    @Test
    public void testTimeoutTracker() throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L)) != null);
        MLTransactionMetadataStore transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        field.setAccessible(true);
        ConcurrentSkipListMap txnMap = (ConcurrentSkipListMap)field.get(transactionMetadataStore);
        int i = -1;
        while (++i < 1000) {
            try {
                transactionMetadataStore.newTransaction(2000L).get();
            }
            catch (Exception exception) {}
        }
        txnMap.forEach((txnID, txnMetaListPair) -> Assert.assertEquals((Object)((TxnMeta)txnMetaListPair.getLeft()).status(), (Object)TxnStatus.OPEN));
        Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
    }

    @Test
    public void testTimeoutTrackerExpired() throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L)) != null);
        MLTransactionMetadataStore transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        field.setAccessible(true);
        ConcurrentSkipListMap txnMap = (ConcurrentSkipListMap)field.get(transactionMetadataStore);
        transactionMetadataStore.newTransaction(2000L).get();
        Assert.assertEquals((int)txnMap.size(), (int)1);
        txnMap.forEach((txnID, txnMetaListPair) -> Assert.assertEquals((Object)((TxnMeta)txnMetaListPair.getLeft()).status(), (Object)TxnStatus.OPEN));
        Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
        transactionMetadataStore.newTransaction(2000L).get();
        Assert.assertEquals((int)txnMap.size(), (int)1);
        txnMap.forEach((txnID, txnMetaListPair) -> Assert.assertEquals((Object)((TxnMeta)txnMetaListPair.getLeft()).status(), (Object)TxnStatus.OPEN));
        Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
    }

    @Test
    public void testTimeoutTrackerMultiThreading() throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L)) != null);
        MLTransactionMetadataStore transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        field.setAccessible(true);
        ConcurrentSkipListMap txnMap = (ConcurrentSkipListMap)field.get(transactionMetadataStore);
        new Thread(() -> {
            int i = -1;
            while (++i < 100) {
                try {
                    transactionMetadataStore.newTransaction(1000L);
                }
                catch (Exception exception) {}
            }
        }).start();
        new Thread(() -> {
            int i = -1;
            while (++i < 100) {
                try {
                    transactionMetadataStore.newTransaction(2000L);
                }
                catch (Exception exception) {}
            }
        }).start();
        new Thread(() -> {
            int i = -1;
            while (++i < 100) {
                try {
                    transactionMetadataStore.newTransaction(3000L);
                }
                catch (Exception exception) {}
            }
        }).start();
        new Thread(() -> {
            int i = -1;
            while (++i < 100) {
                try {
                    transactionMetadataStore.newTransaction(4000L);
                }
                catch (Exception exception) {}
            }
        }).start();
        this.checkoutTimeout(txnMap, 300);
        this.checkoutTimeout(txnMap, 200);
        this.checkoutTimeout(txnMap, 100);
        this.checkoutTimeout(txnMap, 0);
    }

    private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap, int time) {
        Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == time);
    }

    @Test
    public void transactionTimeoutRecoverTest() throws Exception {
        int timeout = 2000;
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L)) != null);
        MLTransactionMetadataStore transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        transactionMetadataStore.newTransaction((long)timeout);
        this.pulsar.getTransactionMetadataStoreService().removeTransactionMetadataStore(TransactionCoordinatorID.get((long)0L));
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L)) != null);
        transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        field.setAccessible(true);
        ConcurrentSkipListMap txnMap = (ConcurrentSkipListMap)field.get(transactionMetadataStore);
        Awaitility.await().until(() -> txnMap.size() == 0);
    }

    @DataProvider(name="txnStatus")
    public Object[][] txnStatus() {
        return new Object[][]{{TxnStatus.OPEN}, {TxnStatus.ABORTING}, {TxnStatus.COMMITTING}};
    }

    @Test(dataProvider="txnStatus")
    public void testEndTransactionOpRetry(TxnStatus txnStatus) throws Exception {
        int timeOut = 3000;
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get((long)0L));
        Awaitility.await().until(() -> this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L)) != null);
        MLTransactionMetadataStore transactionMetadataStore = (MLTransactionMetadataStore)this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)0L));
        this.checkTransactionMetadataStoreReady(transactionMetadataStore);
        TxnID txnID = (TxnID)transactionMetadataStore.newTransaction((long)(timeOut - 2000)).get();
        TxnMeta txnMeta = (TxnMeta)transactionMetadataStore.getTxnMeta(txnID).get();
        txnMeta.updateTxnStatus(txnStatus, TxnStatus.OPEN);
        Field field = TransactionMetadataStoreState.class.getDeclaredField("state");
        field.setAccessible(true);
        field.set(transactionMetadataStore, TransactionMetadataStoreState.State.None);
        CompletableFuture completableFuture = null;
        try {
            completableFuture = this.pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.COMMIT.getValue(), false);
            completableFuture.get(5L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (Exception e) {
            if (txnStatus == TxnStatus.OPEN || txnStatus == TxnStatus.COMMITTING) {
                Assert.assertTrue((boolean)(e instanceof TimeoutException));
            }
            if (txnStatus == TxnStatus.ABORTING) {
                Assert.assertTrue((boolean)(e.getCause() instanceof CoordinatorException.InvalidTxnStatusException));
            }
            Assert.fail();
        }
        Assert.assertEquals((Object)txnMeta.status(), (Object)txnStatus);
        field = TransactionMetadataStoreState.class.getDeclaredField("state");
        field.setAccessible(true);
        field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Ready);
        if (txnStatus == TxnStatus.ABORTING) {
            this.pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.ABORT.getValue(), false).get();
        }
        Awaitility.await().atMost((long)timeOut, TimeUnit.MILLISECONDS).until(() -> {
            try {
                transactionMetadataStore.getTxnMeta(txnID).get();
                return false;
            }
            catch (ExecutionException e) {
                return e.getCause() instanceof CoordinatorException.TransactionNotFoundException;
            }
        });
    }

    private void checkTransactionMetadataStoreReady(MLTransactionMetadataStore transactionMetadataStore) throws NoSuchMethodException {
        Method method = TransactionMetadataStoreState.class.getDeclaredMethod("checkIfReady", new Class[0]);
        method.setAccessible(true);
        Awaitility.await().until(() -> (Boolean)method.invoke((Object)transactionMetadataStore, new Object[0]));
    }
}

