/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction;

import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.util.Bytes;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TransactionTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionTest.class);
    private static final int NUM_BROKERS = 1;
    private static final int NUM_PARTITIONS = 1;

    @BeforeMethod
    protected void setup() throws Exception {
        this.setUpBase(1, 1, "tnx/ns1/test", 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreateTransactionSystemTopic() throws Exception {
        String subName = "test";
        String topicName = TopicName.get((String)"tnx/ns1/testCreateTransactionSystemTopic").toString();
        try {
            Consumer<byte[]> consumer = this.getConsumer(topicName, subName);
            try {
                Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
                consumer.acknowledgeAsync((MessageId)new MessageIdImpl(10L, 10L, 10), transaction).get();
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
        }
        topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix((String)topicName, (String)subName);
        List list = this.admin.topics().getList("tnx/ns1");
        Assert.assertEquals((int)list.size(), (int)4);
        list.forEach(topic -> Assert.assertFalse((boolean)topic.contains("__transaction_pending_ack")));
        try {
            Consumer<byte[]> consumer = this.getConsumer(topicName, subName);
            try {
                Assert.fail();
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        catch (PulsarClientException.NotAllowedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Can not create transaction system topic"));
        }
        try {
            this.admin.topics().getSubscriptions(topicName);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((String)e.getMessage(), (String)("Can not create transaction system topic " + topicName));
        }
        try {
            this.admin.topics().createPartitionedTopic(topicName, 3);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((String)e.getMessage(), (String)"Cannot create topic in system topic format!");
        }
        try {
            this.admin.topics().createNonPartitionedTopic(topicName);
            Assert.fail();
        }
        catch (PulsarAdminException e) {
            Assert.assertEquals((String)e.getMessage(), (String)"Cannot create topic in system topic format!");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
        String subName = "test";
        String topicName = TopicName.get((String)"tnx/ns1/test").toString();
        Consumer<byte[]> consumer = this.getConsumer(topicName, subName);
        try {
            consumer.close();
            Awaitility.await().until(() -> {
                try {
                    this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                }
                catch (Exception e) {
                    return false;
                }
                return true;
            });
            this.admin.namespaces().unload(NamespaceName.SYSTEM_NAMESPACE.toString());
            this.admin.namespaces().unload("tnx/ns1");
            Consumer<byte[]> consumer1 = this.getConsumer(topicName, subName);
            try {
                Awaitility.await().until(() -> {
                    try {
                        this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                    }
                    catch (Exception e) {
                        return false;
                    }
                    return true;
                });
                ConcurrentOpenHashMap topics = this.getPulsarServiceList().get(0).getBrokerService().getTopics();
                Assert.assertNull((Object)topics.get((Object)(TopicName.get((String)TopicDomain.persistent.value(), (NamespaceName)NamespaceName.SYSTEM_NAMESPACE, (String)"__transaction_log_").toString() + 0)));
                Assert.assertNull((Object)topics.get((Object)TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
                Assert.assertNull((Object)topics.get((Object)MLPendingAckStore.getTransactionPendingAckStoreSuffix((String)topicName, (String)subName)));
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    public Consumer<byte[]> getConsumer(String topicName, String subName) throws PulsarClientException {
        return this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subName).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
    }

    @Test
    public void testGetTxnID() throws Exception {
        Transaction transaction = (Transaction)this.pulsarClient.newTransaction().build().get();
        TxnID txnID = transaction.getTxnID();
        Assert.assertEquals((long)txnID.getLeastSigBits(), (long)0L);
        Assert.assertEquals((long)txnID.getMostSigBits(), (long)0L);
        transaction.abort();
        transaction = (Transaction)this.pulsarClient.newTransaction().build().get();
        txnID = transaction.getTxnID();
        Assert.assertEquals((long)txnID.getLeastSigBits(), (long)1L);
        Assert.assertEquals((long)txnID.getMostSigBits(), (long)0L);
    }

    @Test
    public void testSubscriptionRecreateTopic() throws PulsarAdminException, NoSuchFieldException, IllegalAccessException, PulsarClientException {
        String topic = "persistent://pulsar/system/testReCreateTopic";
        String subName = "sub_testReCreateTopic";
        int retentionSizeInMbSetTo = 5;
        int retentionSizeInMbSetTopic = 6;
        int retentionSizeInMinutesSetTo = 5;
        int retentionSizeInMinutesSetTopic = 6;
        this.admin.topics().createNonPartitionedTopic(topic);
        PulsarService pulsarService = super.getPulsarServiceList().get(0);
        pulsarService.getBrokerService().getTopics().clear();
        ManagedLedgerFactory managedLedgerFactory = pulsarService.getBrokerService().getManagedLedgerFactory();
        Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        field.setAccessible(true);
        ConcurrentHashMap ledgers = (ConcurrentHashMap)field.get(managedLedgerFactory);
        ledgers.remove(TopicName.get((String)topic).getPersistenceNamingEncoding());
        try {
            this.admin.topics().createNonPartitionedTopic(topic);
            Assert.fail();
        }
        catch (PulsarAdminException.ConflictException e) {
            log.info("Cann`t create topic again");
        }
        this.admin.topics().setRetention(topic, new RetentionPolicies(retentionSizeInMinutesSetTopic, retentionSizeInMbSetTopic));
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName).subscribe();
        pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> {
            if (!option.isPresent()) {
                log.error("Failed o get Topic named: {}", (Object)topic);
                Assert.fail();
            }
            PersistentTopic originPersistentTopic = (PersistentTopic)option.get();
            String pendingAckTopicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix((String)originPersistentTopic.getName(), (String)subName);
            try {
                this.admin.topics().setRetention(pendingAckTopicName, new RetentionPolicies(retentionSizeInMinutesSetTo, retentionSizeInMbSetTo));
            }
            catch (PulsarAdminException e) {
                log.error("Failed to get./setRetention of topic with Exception:" + (Object)((Object)e));
                Assert.fail();
            }
            PersistentSubscription subscription = originPersistentTopic.getSubscription(subName);
            subscription.getPendingAckManageLedger().thenAccept(managedLedger -> {
                long retentionSize = managedLedger.getConfig().getRetentionSizeInMB();
                if (!originPersistentTopic.getTopicPolicies().isPresent()) {
                    log.error("Failed to getTopicPolicies of :" + originPersistentTopic);
                    Assert.fail();
                }
                TopicPolicies topicPolicies = (TopicPolicies)originPersistentTopic.getTopicPolicies().get();
                Assert.assertEquals((long)retentionSizeInMbSetTopic, (long)retentionSize);
                MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
                CompletableFuture future = mlPendingAckStoreProvider.newPendingAckStore(subscription);
                future.thenAccept(pendingAckStore -> ((MLPendingAckStore)pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> Assert.assertEquals((long)managedLedger1.getConfig().getRetentionSizeInMB(), (long)retentionSizeInMbSetTo)));
            });
        });
    }

    @Test
    public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception {
        String topic = "persistent://tnx/ns1/testSnapShot";
        this.admin.topics().createNonPartitionedTopic(topic);
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false).get()).get();
        ReaderBuilder readerBuilder = this.pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)).startMessageId(MessageId.earliest).topic("tnx/ns1/__transaction_buffer_snapshot");
        Reader reader = readerBuilder.create();
        long waitSnapShotTime = this.getPulsarServiceList().get(0).getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
        Awaitility.await().atMost(waitSnapShotTime * 2L, TimeUnit.MILLISECONDS).untilAsserted(() -> Assert.assertFalse((boolean)reader.hasMessageAvailable()));
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS).topic(topic).enableBatching(true).create();
        Awaitility.await().untilAsserted(() -> {
            Message message1 = reader.readNext();
            TransactionBufferSnapshot snapshot1 = (TransactionBufferSnapshot)message1.getValue();
            Assert.assertEquals((long)snapshot1.getMaxReadPositionEntryId(), (long)-1L);
        });
        producer.newMessage(Schema.STRING).value((Object)"common message send").send();
        producer.newMessage(Schema.STRING).value((Object)"common message send").send();
        Awaitility.await().untilAsserted(() -> {
            Message message1 = reader.readNext();
            TransactionBufferSnapshot snapshot1 = (TransactionBufferSnapshot)message1.getValue();
            Assert.assertEquals((long)snapshot1.getMaxReadPositionEntryId(), (long)1L);
        });
    }

    @Test
    public void testAppendBufferWithNotManageLedgerExceptionCanCastToMLE() throws Exception {
        String topic = "persistent://pulsar/system/testReCreateTopic";
        this.admin.topics().createNonPartitionedTopic(topic);
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)((PulsarService)this.pulsarServiceList.get(0)).getBrokerService().getTopic(topic, false).get()).get();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Topic.PublishContext publishContext = new Topic.PublishContext(){

            public String getProducerName() {
                return "test";
            }

            public long getSequenceId() {
                return 30L;
            }

            public String getOriginalProducerName() {
                return "test";
            }

            public long getOriginalSequenceId() {
                return 30L;
            }

            public long getHighestSequenceId() {
                return 30L;
            }

            public long getOriginalHighestSequenceId() {
                return 30L;
            }

            public long getNumberOfMessages() {
                return 30L;
            }

            public void completed(Exception e, long ledgerId, long entryId) {
                Assert.assertTrue((boolean)(e.getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException));
                countDownLatch.countDown();
            }
        };
        persistentTopic.getManagedLedger().close();
        persistentTopic.publishTxnMessage(new TxnID(123L, 321L), Unpooled.copiedBuffer((CharSequence)"message", (Charset)StandardCharsets.UTF_8), publishContext);
        Awaitility.await().until(() -> {
            countDownLatch.await();
            return true;
        });
    }

    @Test
    public void testMaxReadPositionForNormalPublish() throws Exception {
        String topic = "persistent://tnx/ns1/NormalPublish";
        this.admin.topics().createNonPartitionedTopic(topic);
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false).get()).get();
        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)persistentTopic.getTransactionBuffer();
        PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false).serviceUrl(this.getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
        Producer normalProducer = noTxnClient.newProducer(Schema.STRING).producerName("testNormalPublish").topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)topicTransactionBuffer.checkIfNoSnapshot()));
        MessageIdImpl messageId = (MessageIdImpl)normalProducer.newMessage().value((Object)"normal message").send();
        PositionImpl position = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals((long)position.getLedgerId(), (long)messageId.getLedgerId());
        Assert.assertEquals((long)position.getEntryId(), (long)messageId.getEntryId());
        Producer txnProducer = this.pulsarClient.newProducer(Schema.STRING).producerName("testTransactionPublish").topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)topicTransactionBuffer.checkIfReady()));
        Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        MessageIdImpl messageId1 = (MessageIdImpl)txnProducer.newMessage(transaction).value((Object)"txn message").send();
        PositionImpl position1 = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals((long)position1.getLedgerId(), (long)messageId.getLedgerId());
        Assert.assertEquals((long)position1.getEntryId(), (long)messageId.getEntryId());
        MessageIdImpl messageId2 = (MessageIdImpl)normalProducer.newMessage().value((Object)"normal message").send();
        PositionImpl position2 = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals((long)position2.getLedgerId(), (long)messageId.getLedgerId());
        Assert.assertEquals((long)position2.getEntryId(), (long)messageId.getEntryId());
        transaction.commit().get();
        PositionImpl position3 = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals((long)position3.getLedgerId(), (long)messageId2.getLedgerId());
        Assert.assertEquals((long)position3.getEntryId(), (long)(messageId2.getEntryId() + 1L));
        MessageIdImpl messageId4 = (MessageIdImpl)normalProducer.newMessage().value((Object)"normal message").send();
        PositionImpl position4 = topicTransactionBuffer.getMaxReadPosition();
        Assert.assertEquals((long)position4.getLedgerId(), (long)messageId4.getLedgerId());
        Assert.assertEquals((long)position4.getEntryId(), (long)messageId4.getEntryId());
        Class<?> transactionBufferStateClass = topicTransactionBuffer.getClass().getSuperclass();
        Field field = transactionBufferStateClass.getDeclaredField("state");
        field.setAccessible(true);
        Class<TopicTransactionBuffer> topicTransactionBufferClass = TopicTransactionBuffer.class;
        Field maxReadPositionField = topicTransactionBufferClass.getDeclaredField("maxReadPosition");
        maxReadPositionField.setAccessible(true);
        field.set(topicTransactionBuffer, TopicTransactionBufferState.State.Initializing);
        MessageIdImpl messageId5 = (MessageIdImpl)normalProducer.newMessage().value((Object)"normal message").send();
        PositionImpl position5 = (PositionImpl)maxReadPositionField.get(topicTransactionBuffer);
        Assert.assertEquals((long)position5.getLedgerId(), (long)messageId4.getLedgerId());
        Assert.assertEquals((long)position5.getEntryId(), (long)messageId4.getEntryId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception {
        String topic = "tnx/ns1/testEndTBRecoveringWhenManagerLedgerDisReadable";
        this.admin.topics().createNonPartitionedTopic(topic);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).producerName("test").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
        try {
            Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
            producer.newMessage(txn).value((Object)"test").send();
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic("persistent://" + topic, false).get()).get();
            persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
            ManagedCursorImpl managedCursor = (ManagedCursorImpl)Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursorImpl)Mockito.doReturn((Object)"transaction-buffer-sub").when((Object)managedCursor)).getName();
            ((ManagedCursorImpl)Mockito.doReturn((Object)true).when((Object)managedCursor)).hasMoreEntries();
            ((ManagedCursorImpl)Mockito.doAnswer(invocation -> {
                AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
                callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), null);
                return null;
            }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
            Class<ManagedLedgerImpl> managedLedgerClass = ManagedLedgerImpl.class;
            Field field = managedLedgerClass.getDeclaredField("cursors");
            field.setAccessible(true);
            ManagedCursorContainer managedCursors = (ManagedCursorContainer)field.get(persistentTopic.getManagedLedger());
            managedCursors.removeCursor("transaction-buffer-sub");
            managedCursors.add((ManagedCursor)managedCursor);
            TopicTransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic);
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> TransactionTest.lambda$testEndTBRecoveringWhenManagerLedgerDisReadable$14((TransactionBuffer)buffer1));
            ((ManagedCursorImpl)Mockito.doAnswer(invocation -> {
                AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
                callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.ManagedLedgerFencedException(), null);
                return null;
            }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
            TopicTransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic);
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> TransactionTest.lambda$testEndTBRecoveringWhenManagerLedgerDisReadable$16((TransactionBuffer)buffer2));
            managedCursors.removeCursor("transaction-buffer-sub");
            ((ManagedCursorImpl)Mockito.doAnswer(invocation -> {
                AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
                callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
                return null;
            }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
            managedCursors.add((ManagedCursor)managedCursor);
            TopicTransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> TransactionTest.lambda$testEndTBRecoveringWhenManagerLedgerDisReadable$18((TransactionBuffer)buffer3));
            persistentTopic.getInternalStats(false).thenAccept(internalStats -> Assert.assertTrue((boolean)internalStats.cursors.isEmpty()));
            managedCursors.removeCursor("transaction-buffer-sub");
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception {
        String topic = "tnx/ns1/testEndTPRecoveringWhenManagerLedgerDisReadable";
        this.admin.topics().createNonPartitionedTopic(topic);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).producerName("test").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
        try {
            producer.newMessage().send();
            PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false).get()).get();
            persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
            PersistentSubscription persistentSubscription = (PersistentSubscription)persistentTopic.createSubscription("test", CommandSubscribe.InitialPosition.Earliest, false).get();
            ManagedCursorImpl managedCursor = (ManagedCursorImpl)Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursorImpl)Mockito.doReturn((Object)true).when((Object)managedCursor)).hasMoreEntries();
            ((ManagedCursorImpl)Mockito.doReturn((Object)false).when((Object)managedCursor)).isClosed();
            ((ManagedCursorImpl)Mockito.doReturn((Object)new PositionImpl(-1L, -1L)).when((Object)managedCursor)).getMarkDeletedPosition();
            ((ManagedCursorImpl)Mockito.doAnswer(invocation -> {
                AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
                callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), null);
                return null;
            }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
            TransactionPendingAckStoreProvider pendingAckStoreProvider = (TransactionPendingAckStoreProvider)Mockito.mock(TransactionPendingAckStoreProvider.class);
            ((TransactionPendingAckStoreProvider)Mockito.doReturn(CompletableFuture.completedFuture(new MLPendingAckStore(persistentTopic.getManagedLedger(), (ManagedCursor)managedCursor, null, 500L))).when((Object)pendingAckStoreProvider)).newPendingAckStore((PersistentSubscription)ArgumentMatchers.any());
            ((TransactionPendingAckStoreProvider)Mockito.doReturn(CompletableFuture.completedFuture(true)).when((Object)pendingAckStoreProvider)).checkInitializedBefore((PersistentSubscription)ArgumentMatchers.any());
            Class<PulsarService> pulsarServiceClass = PulsarService.class;
            Field field = pulsarServiceClass.getDeclaredField("transactionPendingAckStoreProvider");
            field.setAccessible(true);
            field.set(this.getPulsarServiceList().get(0), pendingAckStoreProvider);
            PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((String)pendingAckHandle1.getStats().state, (String)"Ready"));
            ((ManagedCursorImpl)Mockito.doAnswer(invocation -> {
                AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
                callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.ManagedLedgerFencedException(), null);
                return null;
            }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
            PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((String)pendingAckHandle2.getStats().state, (String)"Ready"));
            ((ManagedCursorImpl)Mockito.doAnswer(invocation -> {
                AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
                callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
                return null;
            }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
            PendingAckHandleImpl pendingAckHandle3 = new PendingAckHandleImpl(persistentSubscription);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((String)pendingAckHandle3.getStats().state, (String)"Ready"));
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception {
        String topic = "tnx/ns1/testEndTCRecoveringWhenManagerLedgerDisReadable";
        this.admin.topics().createNonPartitionedTopic(topic);
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false).get()).get();
        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("max_local_txn_id", "1");
        persistentTopic.getManagedLedger().setProperties(map);
        ManagedCursor managedCursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        ((ManagedCursor)Mockito.doReturn((Object)true).when((Object)managedCursor)).hasMoreEntries();
        ((ManagedCursor)Mockito.doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
            callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), null);
            return null;
        }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor((ManagedLedgerInterceptor)mlTransactionSequenceIdGenerator);
        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(new TransactionCoordinatorID(1L), null, persistentTopic.getManagedLedger().getConfig());
        Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
        Field field = mlTransactionLogClass.getDeclaredField("cursor");
        field.setAccessible(true);
        field.set(mlTransactionLog, managedCursor);
        field = mlTransactionLogClass.getDeclaredField("managedLedger");
        field.setAccessible(true);
        field.set(mlTransactionLog, persistentTopic.getManagedLedger());
        TransactionRecoverTracker transactionRecoverTracker = (TransactionRecoverTracker)Mockito.mock(TransactionRecoverTracker.class);
        ((TransactionRecoverTracker)Mockito.doNothing().when((Object)transactionRecoverTracker)).appendOpenTransactionToTimeoutTracker();
        ((TransactionRecoverTracker)Mockito.doNothing().when((Object)transactionRecoverTracker)).handleCommittingAndAbortingTransaction();
        TransactionTimeoutTracker timeoutTracker = (TransactionTimeoutTracker)Mockito.mock(TransactionTimeoutTracker.class);
        ((TransactionTimeoutTracker)Mockito.doNothing().when((Object)timeoutTracker)).start();
        MLTransactionMetadataStore metadataStore1 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1L), mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
        metadataStore1.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((String)metadataStore1.getCoordinatorStats().state, (String)"Ready"));
        ((ManagedCursor)Mockito.doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
            callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.ManagedLedgerFencedException(), null);
            return null;
        }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
        MLTransactionMetadataStore metadataStore2 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1L), mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
        metadataStore2.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((String)metadataStore2.getCoordinatorStats().state, (String)"Ready"));
        ((ManagedCursor)Mockito.doAnswer(invocation -> {
            AsyncCallbacks.ReadEntriesCallback callback = (AsyncCallbacks.ReadEntriesCallback)invocation.getArgument(1);
            callback.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
            return null;
        }).when((Object)managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl)ArgumentMatchers.any());
        MLTransactionMetadataStore metadataStore3 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1L), mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
        metadataStore3.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((String)metadataStore3.getCoordinatorStats().state, (String)"Ready"));
    }

    @Test
    public void testEndTxnWhenCommittingOrAborting() throws Exception {
        Transaction commitTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Transaction abortTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Class<TransactionImpl> transactionClass = TransactionImpl.class;
        Field field = transactionClass.getDeclaredField("state");
        field.setAccessible(true);
        field.set(commitTxn, TransactionImpl.State.COMMITTING);
        field.set(abortTxn, TransactionImpl.State.ABORTING);
        abortTxn.abort();
        commitTxn.commit();
    }

    @Test
    public void testNoEntryCanBeReadWhenRecovery() throws Exception {
        String topic = "tnx/ns1/test";
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)((PulsarService)this.pulsarServiceList.get(0)).getBrokerService().getTopic(TopicName.get((String)topic).toString(), true).get()).get();
        Class<PersistentTopic> persistentTopicClass = PersistentTopic.class;
        Field filed1 = persistentTopicClass.getDeclaredField("ledger");
        Field field2 = persistentTopicClass.getDeclaredField("transactionBuffer");
        filed1.setAccessible(true);
        field2.setAccessible(true);
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)Mockito.spy((Object)filed1.get(persistentTopic));
        filed1.set(persistentTopic, managedLedger);
        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)field2.get(persistentTopic);
        Method method = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot", new Class[0]);
        method.setAccessible(true);
        CompletableFuture completableFuture = (CompletableFuture)method.invoke((Object)topicTransactionBuffer, new Object[0]);
        completableFuture.get();
        ((ManagedLedgerImpl)Mockito.doReturn((Object)PositionImpl.latest).when((Object)managedLedger)).getLastConfirmedEntry();
        ManagedCursorImpl managedCursor = (ManagedCursorImpl)Mockito.mock(ManagedCursorImpl.class);
        ((ManagedCursorImpl)Mockito.doReturn((Object)false).when((Object)managedCursor)).hasMoreEntries();
        ((ManagedLedgerImpl)Mockito.doReturn((Object)managedCursor).when((Object)managedLedger)).newNonDurableCursor((Position)ArgumentMatchers.any(), (String)ArgumentMatchers.any());
        TopicTransactionBuffer transactionBuffer = new TopicTransactionBuffer(persistentTopic);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)transactionBuffer.checkIfReady()));
    }

    @Test
    public void testRetryExceptionOfEndTxn() throws Exception {
        Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        Class<TransactionMetadataStoreState> transactionMetadataStoreStateClass = TransactionMetadataStoreState.class;
        this.getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values().forEach(transactionMetadataStore -> {
            try {
                Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
                field.setAccessible(true);
                field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Initializing);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        CompletableFuture completableFuture = transaction.commit();
        try {
            completableFuture.get(5L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        this.getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values().stream().forEach(transactionMetadataStore -> {
            try {
                Field field = transactionMetadataStoreStateClass.getDeclaredField("state");
                field.setAccessible(true);
                field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Ready);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        completableFuture.get(5L, TimeUnit.SECONDS);
    }

    @Test
    public void testCancelTxnTimeout() throws Exception {
        Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        transaction.commit().get();
        Field field = TransactionImpl.class.getDeclaredField("timeout");
        field.setAccessible(true);
        Timeout timeout = (Timeout)field.get(transaction);
        Assert.assertTrue((boolean)timeout.isCancelled());
        transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        transaction.abort().get();
        timeout = (Timeout)field.get(transaction);
        Assert.assertTrue((boolean)timeout.isCancelled());
    }

    @Test
    public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic("tnx/ns1/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true).get()).get();
        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
        Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
        field.setAccessible(true);
        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong)field.get(buffer);
        Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
        field1.setAccessible(true);
        Awaitility.await().untilAsserted(() -> {
            TopicTransactionBufferState.State state = (TopicTransactionBufferState.State)field1.get(buffer);
            Assert.assertEquals((Object)state, (Object)TopicTransactionBufferState.State.NoSnapshot);
        });
        Assert.assertEquals((long)changeMaxReadPositionAndAddAbortTimes.get(), (long)0L);
        buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1L, 1L));
        Assert.assertEquals((long)changeMaxReadPositionAndAddAbortTimes.get(), (long)0L);
    }

    @Test
    public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
        String namespace = "tnx/ns2";
        String topic = namespace + "/test";
        this.pulsarServiceList.forEach(pulsarService -> pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(false));
        this.admin.namespaces().createNamespace(namespace);
        this.admin.topics().createNonPartitionedTopic(topic);
        TopicName transactionBufferTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName((NamespaceName)TopicName.get((String)topic).getNamespaceObject(), (EventType)EventType.TRANSACTION_BUFFER_SNAPSHOT);
        TopicName transactionBufferTopicName1 = NamespaceEventsSystemTopicFactory.getSystemTopicName((NamespaceName)TopicName.get((String)topic).getNamespaceObject(), (EventType)EventType.TOPIC_POLICY);
        Awaitility.await().untilAsserted(() -> {
            SchemaInfo schemaInfo = this.admin.schemas().getSchemaInfo(transactionBufferTopicName.toString());
            Assert.assertNotNull((Object)schemaInfo);
            SchemaInfo schemaInfo1 = this.admin.schemas().getSchemaInfo(transactionBufferTopicName1.toString());
            Assert.assertNotNull((Object)schemaInfo1);
        });
        this.pulsarServiceList.forEach(pulsarService -> pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingAckMarkDeletePosition() throws Exception {
        this.getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(1L);
        this.getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5.0);
        String topic = "tnx/ns1/test1";
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub").subscribe();
            try {
                consumer.getSubscription();
                PersistentSubscription persistentSubscription = (PersistentSubscription)((Topic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false).get()).get()).getSubscription("sub");
                ManagedCursor subscriptionCursor = persistentSubscription.getCursor();
                subscriptionCursor.getMarkDeletedPosition();
                producer.newMessage().value((Object)"test".getBytes(StandardCharsets.UTF_8)).send();
                Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                Message message1 = consumer.receive(10, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message1.getMessageId(), transaction);
                transaction.commit().get();
                transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                producer.newMessage().value((Object)"test".getBytes(StandardCharsets.UTF_8)).send();
                Message message2 = consumer.receive(10, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message2.getMessageId(), transaction);
                Awaitility.await().untilAsserted(() -> {
                    ManagedLedgerInternalStats managedLedgerInternalStats = this.admin.transactions().getPendingAckInternalStats((String)topic, (String)"sub", (boolean)false).pendingAckLogStats.managedLedgerInternalStats;
                    String[] markDeletePosition = ((ManagedLedgerInternalStats.CursorStats)managedLedgerInternalStats.cursors.get((Object)"__pending_ack_state")).markDeletePosition.split(":");
                    String[] lastConfirmedEntry = managedLedgerInternalStats.lastConfirmedEntry.split(":");
                    Assert.assertEquals((String)markDeletePosition[0], (String)lastConfirmedEntry[0]);
                    Assert.assertEquals((int)Integer.parseInt(markDeletePosition[1]), (int)(Integer.parseInt(lastConfirmedEntry[1]) - 2));
                });
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
        TransactionMetadataStore transactionMetadataStore = (TransactionMetadataStore)this.getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().get(new TransactionCoordinatorID(0L));
        Field field = MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
        field.setAccessible(true);
        MLTransactionLogImpl transactionLog = (MLTransactionLogImpl)field.get(transactionMetadataStore);
        Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
        field1.setAccessible(true);
        ManagedCursorImpl managedCursor = (ManagedCursorImpl)field1.get(transactionLog);
        managedCursor.close();
        Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        transaction.commit().get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
        String topic = "tnx/ns1/testGetConnectExceptionForAckMsgWhenCnxIsNull";
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub").subscribe();
            try {
                for (int i = 0; i < 10; ++i) {
                    producer.newMessage().value((Object)Bytes.toBytes((long)i)).send();
                }
                ClientCnx cnx = (ClientCnx)Whitebox.invokeMethod((Object)consumer, (String)"cnx", (Object[])new Object[0]);
                Whitebox.invokeMethod((Object)consumer, (String)"connectionClosed", (Object[])new Object[]{cnx});
                Message message = consumer.receive();
                Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                try {
                    consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
                    Assert.fail();
                }
                catch (ExecutionException e) {
                    Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.ConnectException));
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingAckBatchMessageCommit() throws Exception {
        String topic = "tnx/ns1/testPendingAckBatchMessageCommit";
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic(topic).enableBatching(true).batchingMaxPublishDelay(3L, TimeUnit.SECONDS).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().subscriptionType(SubscriptionType.Shared).topic(new String[]{topic}).subscriptionName("sub").subscribe();
            try {
                for (int i = 0; i < 5; ++i) {
                    producer.sendAsync((Object)("test" + i).getBytes());
                }
                producer.flush();
                Transaction txn1 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.MINUTES).build().get();
                consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn1).get();
                Transaction txn2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.MINUTES).build().get();
                MessageId messageId = consumer.receive().getMessageId();
                consumer.acknowledgeAsync(messageId, txn2).get();
                txn1.commit().get();
                txn2.abort().get();
                Transaction txn3 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.MINUTES).build().get();
                consumer.acknowledgeAsync(messageId, txn3).get();
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration)Mockito.mock(ServiceConfiguration.class);
        Mockito.when((Object)serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn((Object)true);
        ExecutorProvider executorProvider = (ExecutorProvider)Mockito.mock(ExecutorProvider.class);
        Mockito.when((Object)executorProvider.getExecutor()).thenReturn((Object)executorService);
        Mockito.when((Object)executorProvider.getExecutor(ArgumentMatchers.any(Object.class))).thenReturn((Object)executorService);
        PendingAckStore pendingAckStore = (PendingAckStore)Mockito.mock(PendingAckStore.class);
        ((PendingAckStore)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                executorService.execute(() -> {
                    PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)invocation.getArguments()[0];
                    pendingAckHandle.close();
                    MLPendingAckReplyCallBack mlPendingAckReplyCallBack = new MLPendingAckReplyCallBack(pendingAckHandle);
                    mlPendingAckReplyCallBack.replayComplete();
                });
                return null;
            }
        }).when((Object)pendingAckStore)).replayAsync((PendingAckHandleImpl)ArgumentMatchers.any(), (ScheduledExecutorService)ArgumentMatchers.any());
        TransactionPendingAckStoreProvider pendingAckStoreProvider = (TransactionPendingAckStoreProvider)Mockito.mock(TransactionPendingAckStoreProvider.class);
        Mockito.when((Object)pendingAckStoreProvider.checkInitializedBefore((PersistentSubscription)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        Mockito.when((Object)pendingAckStoreProvider.newPendingAckStore((PersistentSubscription)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(pendingAckStore));
        PulsarService pulsar = (PulsarService)Mockito.mock(PulsarService.class);
        Mockito.when((Object)pulsar.getConfig()).thenReturn((Object)serviceConfiguration);
        Mockito.when((Object)pulsar.getTransactionExecutorProvider()).thenReturn((Object)executorProvider);
        Mockito.when((Object)pulsar.getTransactionPendingAckStoreProvider()).thenReturn((Object)pendingAckStoreProvider);
        BrokerService brokerService = (BrokerService)Mockito.mock(BrokerService.class);
        Mockito.when((Object)brokerService.getPulsar()).thenReturn((Object)pulsar);
        PersistentTopic topic = (PersistentTopic)Mockito.mock(PersistentTopic.class);
        Mockito.when((Object)topic.getBrokerService()).thenReturn((Object)brokerService);
        Mockito.when((Object)topic.getName()).thenReturn((Object)"topic-a");
        ManagedCursor cursor_subscription = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        ((ManagedCursor)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("1")}).when((Object)cursor_subscription)).updateLastActive();
        String subscriptionName = "sub-a";
        boolean replicated = false;
        PersistentSubscription persistentSubscription = new PersistentSubscription(topic, subscriptionName, cursor_subscription, replicated);
        org.apache.pulsar.broker.service.Consumer consumer = (org.apache.pulsar.broker.service.Consumer)Mockito.mock(org.apache.pulsar.broker.service.Consumer.class);
        try {
            CompletableFuture addConsumerFuture = persistentSubscription.addConsumer(consumer);
            addConsumerFuture.get(5L, TimeUnit.SECONDS);
            Assert.fail((String)"Expect failure by PendingAckHandle closed, but success");
        }
        catch (ExecutionException executionException) {
            Throwable t = executionException.getCause();
            Assert.assertTrue((boolean)(t instanceof BrokerServiceException.ServiceUnitNotReadyException));
        }
    }

    @Test
    public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
        final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>();
        AtomicInteger atomicInteger = new AtomicInteger(1);
        ScheduledExecutorService executorService_recover = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class);
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration)Mockito.mock(ServiceConfiguration.class);
        Mockito.when((Object)serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn((Object)false);
        Mockito.when((Object)serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn((Object)true);
        ExecutorProvider executorProvider = (ExecutorProvider)Mockito.mock(ExecutorProvider.class);
        Mockito.when((Object)executorProvider.getExecutor(ArgumentMatchers.any(Object.class))).thenReturn((Object)executorService_recover);
        PendingAckStore pendingAckStore = (PendingAckStore)Mockito.mock(PendingAckStore.class);
        ((ScheduledExecutorService)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                new Thread(() -> {
                    TopicTransactionBuffer.TopicTransactionBufferRecover recover = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0];
                    TopicTransactionBufferRecoverCallBack callBack = (TopicTransactionBufferRecoverCallBack)Whitebox.getInternalState((Object)recover, (String)"callBack");
                    try {
                        ((PersistentTopic)persistentTopic.get()).getTransactionBuffer().closeAsync().get();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    catch (ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                    callBack.recoverComplete();
                }).start();
                return null;
            }
        }).when((Object)executorService_recover)).execute((Runnable)ArgumentMatchers.any());
        TransactionPendingAckStoreProvider pendingAckStoreProvider = (TransactionPendingAckStoreProvider)Mockito.mock(TransactionPendingAckStoreProvider.class);
        Mockito.when((Object)pendingAckStoreProvider.checkInitializedBefore((PersistentSubscription)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        Mockito.when((Object)pendingAckStoreProvider.newPendingAckStore((PersistentSubscription)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(pendingAckStore));
        TransactionBufferSnapshotService transactionBufferSnapshotService = (TransactionBufferSnapshotService)Mockito.mock(TransactionBufferSnapshotService.class);
        SystemTopicClient.Writer writer = (SystemTopicClient.Writer)Mockito.mock(SystemTopicClient.Writer.class);
        Mockito.when((Object)writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)transactionBufferSnapshotService.createWriter((TopicName)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(writer));
        PulsarService pulsar = (PulsarService)Mockito.mock(PulsarService.class);
        Mockito.when((Object)pulsar.getConfiguration()).thenReturn((Object)serviceConfiguration);
        Mockito.when((Object)pulsar.getConfig()).thenReturn((Object)serviceConfiguration);
        Mockito.when((Object)pulsar.getTransactionExecutorProvider()).thenReturn((Object)executorProvider);
        Mockito.when((Object)pulsar.getTransactionBufferSnapshotService()).thenReturn((Object)transactionBufferSnapshotService);
        PulsarResources pulsarResources = (PulsarResources)Mockito.mock(PulsarResources.class);
        Mockito.when((Object)pulsar.getPulsarResources()).thenReturn((Object)pulsarResources);
        NamespaceResources nsResources = (NamespaceResources)Mockito.mock(NamespaceResources.class);
        Mockito.when((Object)pulsarResources.getNamespaceResources()).thenReturn((Object)nsResources);
        TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
        Mockito.when((Object)pulsar.getTransactionBufferProvider()).thenReturn((Object)topicTransactionBufferProvider);
        BacklogQuotaManager backlogQuotaManager = (BacklogQuotaManager)Mockito.mock(BacklogQuotaManager.class);
        BrokerService brokerService = (BrokerService)Mockito.mock(BrokerService.class);
        Mockito.when((Object)brokerService.getPulsar()).thenReturn((Object)pulsar);
        Mockito.when((Object)brokerService.pulsar()).thenReturn((Object)pulsar);
        Mockito.when((Object)brokerService.getBacklogQuotaManager()).thenReturn((Object)backlogQuotaManager);
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)Mockito.mock(ManagedLedgerImpl.class);
        ManagedCursorContainer managedCursors = new ManagedCursorContainer();
        Mockito.when((Object)managedLedger.getCursors()).thenReturn((Object)managedCursors);
        PositionImpl position = PositionImpl.earliest;
        Mockito.when((Object)managedLedger.getLastConfirmedEntry()).thenReturn((Object)position);
        persistentTopic.set(new PersistentTopic("topic-a", (ManagedLedger)managedLedger, brokerService));
        try {
            ((PersistentTopic)persistentTopic.get()).checkIfTransactionBufferRecoverCompletely(true).get(5L, TimeUnit.SECONDS);
            Assert.fail((String)"Expect failure by TB closed, but it is finished.");
        }
        catch (ExecutionException executionException) {
            Throwable t = executionException.getCause();
            Assert.assertTrue((boolean)(t instanceof BrokerServiceException.ServiceUnitNotReadyException));
        }
    }

    private static /* synthetic */ void lambda$testEndTBRecoveringWhenManagerLedgerDisReadable$18(TransactionBuffer buffer3) throws Throwable {
        Assert.assertEquals((String)buffer3.getStats().state, (String)"Ready");
    }

    private static /* synthetic */ void lambda$testEndTBRecoveringWhenManagerLedgerDisReadable$16(TransactionBuffer buffer2) throws Throwable {
        Assert.assertEquals((String)buffer2.getStats().state, (String)"Ready");
    }

    private static /* synthetic */ void lambda$testEndTBRecoveringWhenManagerLedgerDisReadable$14(TransactionBuffer buffer1) throws Throwable {
        Assert.assertEquals((String)buffer1.getStats().state, (String)"Ready");
    }
}

