/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TransactionClientConnectTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionClientConnectTest.class);
    private static final String RECONNECT_TOPIC = "tnx/ns1/txn-client-reconnect-test";
    private static final int NUM_PARTITIONS = 1;

    @BeforeMethod(alwaysRun=true)
    public void setup() throws Exception {
        this.setUpBase(1, 1, RECONNECT_TOPIC, 0);
        this.admin.topics().createSubscription(RECONNECT_TOPIC, "test", MessageId.latest);
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() {
        super.internalCleanup();
    }

    @Test
    public void testTransactionNewReconnect() throws Exception {
        Callable<CompletableFuture<?>> callable = () -> this.pulsarClient.newTransaction().withTransactionTimeout(200L, TimeUnit.MILLISECONDS).build();
        this.tryCommandReconnect(callable, callable);
    }

    @Test
    public void testTransactionAddSubscriptionToTxnAsyncReconnect() throws Exception {
        TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl)this.pulsarClient).getTcClient();
        Callable<CompletableFuture<?>> callable = () -> transactionCoordinatorClient.addSubscriptionToTxnAsync(new TxnID(0L, 0L), "test", "test");
        this.tryCommandReconnect(callable, callable);
    }

    public void tryCommandReconnect(Callable<CompletableFuture<?>> callable1, Callable<CompletableFuture<?>> callable2) throws Exception {
        this.start();
        try {
            callable1.call().get();
        }
        catch (ExecutionException e) {
            Assert.assertFalse((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException));
            this.waitToReady();
            callable1.call().get();
        }
        this.fence(this.getPulsarServiceList().get(0).getTransactionMetadataStoreService());
        CompletableFuture<?> completableFuture = callable2.call();
        try {
            completableFuture.get(3L, TimeUnit.SECONDS);
        }
        catch (TimeoutException timeoutException) {
        }
        catch (ExecutionException e) {
            org.testng.Assert.assertFalse((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException));
        }
        this.unFence(this.getPulsarServiceList().get(0).getTransactionMetadataStoreService());
        completableFuture.get();
    }

    @Test
    public void testTransactionAbortToTxnAsyncReconnect() throws Exception {
        TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl)this.pulsarClient).getTcClient();
        Callable<CompletableFuture<?>> callable1 = () -> transactionCoordinatorClient.abortAsync(new TxnID(0L, 0L));
        Callable<CompletableFuture<?>> callable2 = () -> transactionCoordinatorClient.abortAsync(new TxnID(0L, 1L));
        this.tryCommandReconnect(callable1, callable2);
    }

    @Test
    public void testTransactionCommitToTxnAsyncReconnect() throws Exception {
        TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl)this.pulsarClient).getTcClient();
        Callable<CompletableFuture<?>> callable1 = () -> transactionCoordinatorClient.commitAsync(new TxnID(0L, 0L));
        Callable<CompletableFuture<?>> callable2 = () -> transactionCoordinatorClient.commitAsync(new TxnID(0L, 1L));
        this.tryCommandReconnect(callable1, callable2);
    }

    @Test
    public void testTransactionAddPublishPartitionToTxnReconnect() throws Exception {
        TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl)this.pulsarClient).getTcClient();
        Callable<CompletableFuture<?>> callable = () -> transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0L, 0L), Collections.singletonList("test"));
        this.tryCommandReconnect(callable, callable);
    }

    @Test
    public void testPulsarClientCloseThenCloseTcClient() throws Exception {
        TransactionMetaStoreHandler[] handlers;
        TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl)this.pulsarClient).getTcClient();
        Field field = TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
        field.setAccessible(true);
        for (TransactionMetaStoreHandler handler : handlers = (TransactionMetaStoreHandler[])field.get(transactionCoordinatorClient)) {
            handler.newTransactionAsync(10L, TimeUnit.SECONDS).get();
        }
        for (TransactionMetaStoreHandler handler : handlers) {
            Field stateField = HandlerState.class.getDeclaredField("state");
            stateField.setAccessible(true);
            stateField.set(handler, HandlerState.State.Closed);
        }
        for (TransactionMetaStoreHandler handler : handlers) {
            Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState", new Class[0]);
            method.setAccessible(true);
            org.testng.Assert.assertEquals((String)method.invoke((Object)handler, new Object[0]).toString(), (String)"Closed");
            try {
                handler.newTransactionAsync(10L, TimeUnit.SECONDS).get();
            }
            catch (InterruptedException | ExecutionException e) {
                org.testng.Assert.assertTrue((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException));
            }
        }
    }

    public void start() throws Exception {
        this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        TransactionMetadataStoreService transactionMetadataStoreService = this.getPulsarServiceList().get(0).getTransactionMetadataStoreService();
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get((long)0L)).get();
    }

    public void fence(TransactionMetadataStoreService transactionMetadataStoreService) throws Exception {
        Field field = ManagedLedgerImpl.class.getDeclaredField("state");
        field.setAccessible(true);
        field.set(((MLTransactionMetadataStore)transactionMetadataStoreService.getStores().get(TransactionCoordinatorID.get((long)0L))).getManagedLedger(), ManagedLedgerImpl.State.Fenced);
    }

    public void unFence(TransactionMetadataStoreService transactionMetadataStoreService) throws Exception {
        Field field = ManagedLedgerImpl.class.getDeclaredField("state");
        field.setAccessible(true);
        field.set(((MLTransactionMetadataStore)transactionMetadataStoreService.getStores().get(TransactionCoordinatorID.get((long)0L))).getManagedLedger(), ManagedLedgerImpl.State.LedgerOpened);
    }

    public void waitToReady() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.getPulsarServiceList().get(0).getTransactionMetadataStoreService();
        Class<TransactionMetadataStoreService> transactionMetadataStoreServiceClass = TransactionMetadataStoreService.class;
        Field field1 = transactionMetadataStoreServiceClass.getDeclaredField("stores");
        field1.setAccessible(true);
        Map stores = (Map)field1.get(transactionMetadataStoreService);
        Awaitility.await().until(() -> {
            for (TransactionMetadataStore transactionMetadataStore : stores.values()) {
                Class<TransactionMetadataStoreState> transactionMetadataStoreStateClass = TransactionMetadataStoreState.class;
                Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
                field.setAccessible(true);
                TransactionMetadataStoreState.State state = (TransactionMetadataStoreState.State)field.get(transactionMetadataStore);
                if (state.equals((Object)TransactionMetadataStoreState.State.Ready)) continue;
                return false;
            }
            return true;
        });
    }
}

