/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.matadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TopicTransactionBufferRecoverTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TopicTransactionBufferRecoverTest.class);
    private static final String RECOVER_COMMIT = "tnx/ns1/recover-commit";
    private static final String RECOVER_ABORT = "tnx/ns1/recover-abort";
    private static final String SUBSCRIPTION_NAME = "test-recover";
    private static final String TAKE_SNAPSHOT = "tnx/ns1/take-snapshot";
    private static final String ABORT_DELETE = "tnx/ns1/abort-delete";
    private static final int NUM_PARTITIONS = 16;

    @BeforeMethod
    protected void setup() throws Exception {
        this.setUpBase(1, 16, RECOVER_COMMIT, 0);
        this.admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
        this.admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
            this.pulsarClient = null;
        }
        super.internalCleanup();
    }

    @DataProvider(name="testTopic")
    public Object[] testTopic() {
        return new Object[]{RECOVER_ABORT, RECOVER_COMMIT};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="testTopic")
    private void recoverTest(String testTopic) throws Exception {
        PulsarClient pulsarClient = this.pulsarClient;
        Transaction tnx1 = (Transaction)pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        Transaction tnx2 = (Transaction)pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{testTopic}).subscriptionName(SUBSCRIPTION_NAME).subscribe();
        try {
            org.apache.pulsar.client.api.Producer producer = pulsarClient.newProducer(Schema.STRING).topic(testTopic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            try {
                int messageCnt = 10;
                String content = "Hello Txn - ";
                for (int i = 0; i < messageCnt; ++i) {
                    MessageId messageId;
                    String msg = content + i;
                    if (i % 2 == 0) {
                        messageId = producer.newMessage(tnx1).value((Object)msg).send();
                        log.info("Txn1 send message : {}, messageId : {}", (Object)msg, (Object)messageId);
                        continue;
                    }
                    messageId = producer.newMessage(tnx2).value((Object)msg).send();
                    log.info("Txn2 send message : {}, messageId : {}", (Object)msg, (Object)messageId);
                }
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                tnx1.commit();
                message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)message);
                log.info("Txn1 commit receive message : {}, messageId : {}", message.getValue(), (Object)message.getMessageId());
                consumer.acknowledge(message);
                message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                this.admin.topics().unload(testTopic);
                Awaitility.await().until(() -> {
                    for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                        Optional topic;
                        Field field = BrokerService.class.getDeclaredField("topics");
                        field.setAccessible(true);
                        ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                        CompletableFuture completableFuture = (CompletableFuture)topics.get((Object)("persistent://" + testTopic));
                        if (completableFuture == null || !(topic = (Optional)completableFuture.get()).isPresent()) continue;
                        PersistentTopic persistentTopic = (PersistentTopic)topic.get();
                        field = PersistentTopic.class.getDeclaredField("transactionBuffer");
                        field.setAccessible(true);
                        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)field.get(persistentTopic);
                        if (topicTransactionBuffer.checkIfReady()) {
                            return true;
                        }
                        return false;
                    }
                    return false;
                });
                if (testTopic.equals(RECOVER_COMMIT)) {
                    tnx2.commit().get();
                    for (int i = messageCnt; i > 1; --i) {
                        message = consumer.receive();
                        log.info("Txn2 commit receive message : {}, messageId : {}", message.getValue(), (Object)message.getMessageId());
                        consumer.acknowledge(message);
                    }
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                } else {
                    tnx2.abort().get();
                    for (int i = messageCnt / 2; i > 1; --i) {
                        message = consumer.receive();
                        log.info("Txn2 commit receive message : {}, messageId : {}", message.getValue(), (Object)message.getMessageId());
                        consumer.acknowledge(message);
                    }
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                }
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    private void testTakeSnapshot() throws IOException, ExecutionException, InterruptedException {
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(TAKE_SNAPSHOT).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Transaction tnx1 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Transaction tnx2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Transaction tnx3 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Transaction abortTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            ReaderBuilder readerBuilder = this.pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)).startMessageId(MessageId.earliest).topic("tnx/ns1/__transaction_buffer_snapshot");
            Reader reader = readerBuilder.create();
            MessageId messageId1 = producer.newMessage(tnx1).value((Object)"test").send();
            tnx1.commit().get();
            Awaitility.await().untilAsserted(() -> {
                TransactionBufferSnapshot transactionBufferSnapshot = (TransactionBufferSnapshot)reader.readNext().getValue();
                Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionEntryId(), (long)-1L);
                Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionLedgerId(), (long)((MessageIdImpl)messageId1).getLedgerId());
                transactionBufferSnapshot = (TransactionBufferSnapshot)reader.readNext().getValue();
                Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionEntryId(), (long)(((MessageIdImpl)messageId1).getEntryId() + 1L));
                Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionLedgerId(), (long)((MessageIdImpl)messageId1).getLedgerId());
                Assert.assertFalse((boolean)reader.hasMessageAvailable());
            });
            MessageId messageId2 = producer.newMessage(tnx2).value((Object)"test").send();
            tnx2.commit().get();
            TransactionBufferSnapshot snapshot = (TransactionBufferSnapshot)reader.readNext().getValue();
            Assert.assertEquals((long)snapshot.getMaxReadPositionEntryId(), (long)(((MessageIdImpl)messageId2).getEntryId() + 1L));
            Assert.assertEquals((long)snapshot.getMaxReadPositionLedgerId(), (long)((MessageIdImpl)messageId2).getLedgerId());
            Assert.assertEquals((int)snapshot.getAborts().size(), (int)0);
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
            MessageId messageId3 = producer.newMessage(abortTxn).value((Object)"test").send();
            abortTxn.abort().get();
            TransactionBufferSnapshot transactionBufferSnapshot = (TransactionBufferSnapshot)reader.readNext().getValue();
            Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionEntryId(), (long)(((MessageIdImpl)messageId3).getEntryId() + 1L));
            Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionLedgerId(), (long)((MessageIdImpl)messageId3).getLedgerId());
            Assert.assertEquals((int)transactionBufferSnapshot.getAborts().size(), (int)1);
            Assert.assertEquals((long)((AbortTxnMetadata)transactionBufferSnapshot.getAborts().get(0)).getTxnIdLeastBits(), (long)((TransactionImpl)abortTxn).getTxnIdLeastBits());
            Assert.assertEquals((long)((AbortTxnMetadata)transactionBufferSnapshot.getAborts().get(0)).getTxnIdMostBits(), (long)((TransactionImpl)abortTxn).getTxnIdMostBits());
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
            reader.close();
            producer.close();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    private void testTopicTransactionBufferDeleteAbort() throws Exception {
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(ABORT_DELETE).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{ABORT_DELETE}).subscriptionName(SUBSCRIPTION_NAME).subscribe();
            try {
                Transaction tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                String value = "Hello Pulsar!";
                MessageId messageId1 = producer.newMessage(tnx).value((Object)value).send();
                tnx.abort().get();
                this.admin.topics().unload(ABORT_DELETE);
                tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                value = "Hello";
                producer.newMessage(tnx).value((Object)value).send();
                tnx.commit().get();
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                System.out.println("consumer receive message" + message.getMessageId());
                Assert.assertNotNull((Object)message.getValue(), (String)value);
                consumer.acknowledge(message);
                tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                MessageId messageId2 = producer.newMessage(tnx).value((Object)value).send();
                tnx.abort().get();
                Assert.assertTrue((((MessageIdImpl)messageId2).getLedgerId() != ((MessageIdImpl)messageId1).getLedgerId() ? 1 : 0) != 0);
                boolean exist = false;
                for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                    Optional topic;
                    Field field = BrokerService.class.getDeclaredField("topics");
                    field.setAccessible(true);
                    ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                    CompletableFuture completableFuture = (CompletableFuture)topics.get((Object)"persistent://tnx/ns1/abort-delete");
                    if (completableFuture == null || !(topic = (Optional)completableFuture.get()).isPresent()) continue;
                    PersistentTopic persistentTopic = (PersistentTopic)topic.get();
                    field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
                    field.setAccessible(true);
                    NavigableMap ledgers = (NavigableMap)field.get(persistentTopic.getManagedLedger());
                    ledgers.remove(((MessageIdImpl)messageId1).getLedgerId());
                    tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                    producer.newMessage(tnx).value((Object)value).send();
                    tnx.commit().get();
                    field = PersistentTopic.class.getDeclaredField("transactionBuffer");
                    field.setAccessible(true);
                    TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)field.get(persistentTopic);
                    field = TopicTransactionBuffer.class.getDeclaredField("aborts");
                    field.setAccessible(true);
                    LinkedMap linkedMap = (LinkedMap)field.get(topicTransactionBuffer);
                    Assert.assertEquals((int)linkedMap.size(), (int)1);
                    Assert.assertEquals((long)((PositionImpl)linkedMap.get(linkedMap.firstKey())).getLedgerId(), (long)((MessageIdImpl)message.getMessageId()).getLedgerId());
                    exist = true;
                }
                Assert.assertTrue((boolean)exist);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void clearTransactionBufferSnapshotTest() throws Exception {
        String topic = "tnx/ns1/tb-snapshot-delete-" + RandomUtils.nextInt();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            producer.newMessage(txn).value((Object)"test".getBytes()).sendAsync();
            producer.newMessage(txn).value((Object)"test".getBytes()).sendAsync();
            txn.commit().get();
            PersistentTopic originalTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)topic).toString(), false).get()).get();
            TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)originalTopic.getTransactionBuffer();
            Method takeSnapshotMethod = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot", new Class[0]);
            takeSnapshotMethod.setAccessible(true);
            takeSnapshotMethod.invoke((Object)topicTransactionBuffer, new Object[0]);
            TopicName transactionBufferTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName((NamespaceName)TopicName.get((String)topic).getNamespaceObject(), (EventType)EventType.TRANSACTION_BUFFER_SNAPSHOT);
            PersistentTopic snapshotTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(transactionBufferTopicName.toString(), false).get()).get();
            Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
            field.setAccessible(true);
            this.checkSnapshotCount(transactionBufferTopicName, true, snapshotTopic, field);
            this.admin.topics().delete(topic, true);
            this.checkSnapshotCount(transactionBufferTopicName, false, snapshotTopic, field);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot, PersistentTopic persistentTopic, Field field) throws Exception {
        Message snapshotMsg;
        persistentTopic.triggerCompaction();
        CompletableFuture compactionFuture = (CompletableFuture)field.get(persistentTopic);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)compactionFuture.isDone()));
        Reader reader = this.pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)).readCompacted(true).startMessageId(MessageId.earliest).startMessageIdInclusive().topic(topicName.toString()).create();
        int count = 0;
        while ((snapshotMsg = reader.readNext(2, TimeUnit.SECONDS)) != null) {
            ++count;
        }
        Assert.assertTrue((boolean)(hasSnapshot ? count > 0 : count == 0));
        reader.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testTransactionBufferRecoverThrowException() throws Exception {
        String topic = "tnx/ns1/testTransactionBufferRecoverThrowPulsarClientException";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            producer.newMessage(txn).value((Object)"test".getBytes()).sendAsync();
            producer.newMessage(txn).value((Object)"test".getBytes()).sendAsync();
            txn.commit().get();
            producer.close();
            PersistentTopic originalTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)topic).toString(), false).get()).get();
            TransactionBufferSnapshotService transactionBufferSnapshotService = (TransactionBufferSnapshotService)Mockito.mock(TransactionBufferSnapshotService.class);
            SystemTopicClient.Reader reader = (SystemTopicClient.Reader)Mockito.mock(SystemTopicClient.Reader.class);
            SystemTopicClient.Writer writer = (SystemTopicClient.Writer)Mockito.mock(SystemTopicClient.Writer.class);
            ((TransactionBufferSnapshotService)Mockito.doReturn(CompletableFuture.completedFuture(reader)).when((Object)transactionBufferSnapshotService)).createReader((TopicName)ArgumentMatchers.any());
            ((TransactionBufferSnapshotService)Mockito.doReturn(CompletableFuture.completedFuture(writer)).when((Object)transactionBufferSnapshotService)).createWriter((TopicName)ArgumentMatchers.any());
            ((SystemTopicClient.Reader)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)reader)).closeAsync();
            ((SystemTopicClient.Writer)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)writer)).closeAsync();
            Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
            field.setAccessible(true);
            TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal = (TransactionBufferSnapshotService)field.get(this.getPulsarServiceList().get(0));
            ((SystemTopicClient.Reader)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("test")}).when((Object)reader)).hasMoreEvents();
            this.checkCloseTopic(this.pulsarClient, transactionBufferSnapshotServiceOriginal, transactionBufferSnapshotService, originalTopic, field);
            ((SystemTopicClient.Reader)Mockito.doReturn((Object)true).when((Object)reader)).hasMoreEvents();
            ((SystemTopicClient.Reader)Mockito.doThrow((Throwable[])new Throwable[]{new PulsarClientException("test")}).when((Object)reader)).hasMoreEvents();
            this.checkCloseTopic(this.pulsarClient, transactionBufferSnapshotServiceOriginal, transactionBufferSnapshotService, originalTopic, field);
            ((SystemTopicClient.Reader)Mockito.doReturn((Object)true).when((Object)reader)).hasMoreEvents();
            ((TransactionBufferSnapshotService)Mockito.doReturn((Object)FutureUtil.failedFuture((Throwable)new PulsarClientException("test"))).when((Object)transactionBufferSnapshotService)).createReader((TopicName)ArgumentMatchers.any());
            originalTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)topic).toString(), false).get()).get();
            this.checkCloseTopic(this.pulsarClient, transactionBufferSnapshotServiceOriginal, transactionBufferSnapshotService, originalTopic, field);
            ((TransactionBufferSnapshotService)Mockito.doReturn(CompletableFuture.completedFuture(reader)).when((Object)transactionBufferSnapshotService)).createReader((TopicName)ArgumentMatchers.any());
            originalTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)topic).toString(), false).get()).get();
            ((TransactionBufferSnapshotService)Mockito.doReturn((Object)FutureUtil.failedFuture((Throwable)new PulsarClientException("test"))).when((Object)transactionBufferSnapshotService)).createWriter((TopicName)ArgumentMatchers.any());
            this.checkCloseTopic(this.pulsarClient, transactionBufferSnapshotServiceOriginal, transactionBufferSnapshotService, originalTopic, field);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkCloseTopic(PulsarClient pulsarClient, TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal, TransactionBufferSnapshotService transactionBufferSnapshotService, PersistentTopic originalTopic, Field field) throws Exception {
        field.set(this.getPulsarServiceList().get(0), transactionBufferSnapshotService);
        new TopicTransactionBuffer(originalTopic);
        Awaitility.await().untilAsserted(() -> {
            Field close = AbstractTopic.class.getDeclaredField("isFenced");
            close.setAccessible(true);
            Assert.assertTrue((boolean)((Boolean)close.get(originalTopic)));
        });
        field.set(this.getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
        Transaction txn = (Transaction)pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        org.apache.pulsar.client.api.Producer producer = pulsarClient.newProducer().topic(originalTopic.getName()).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            producer.newMessage(txn).value((Object)"test".getBytes()).sendAsync();
            txn.commit().get();
            producer.close();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionBufferNoSnapshotCloseReader() throws Exception {
        String topic = "tnx/ns1/test";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer").topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            this.admin.topics().unload(topic);
            producer.send((Object)"test");
            TopicStats stats = this.admin.topics().getStats("tnx/ns1/__transaction_buffer_snapshot");
            Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
            Assert.assertTrue((boolean)stats.getSubscriptions().keySet().contains("__compaction"));
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void transactionBufferRecoverFailRemoveProducerFuture() throws Exception {
        String topic = "tnx/ns1/transactionBufferRecoverFailRemoveProducerFuture";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            producer.newMessage(txn).value((Object)"test".getBytes()).sendAsync();
            producer.newMessage(txn).value((Object)"test".getBytes()).sendAsync();
            txn.commit().get();
            PersistentTopic originalTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)topic).toString(), false).get()).get();
            TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)originalTopic.getTransactionBuffer();
            CompletableFuture bufferFuture = new CompletableFuture();
            bufferFuture.completeExceptionally((Throwable)new BrokerServiceException.ServiceUnitNotReadyException("test"));
            Whitebox.setInternalState((Object)topicTransactionBuffer, (String)"transactionBufferFuture", bufferFuture);
            ((Producer)originalTopic.getProducers().get(originalTopic.getProducers().keySet().toArray()[0])).disconnect().get();
            Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)producer.isConnected()));
            Awaitility.await().until(() -> !producer.isConnected());
            Awaitility.await().during(5L, TimeUnit.SECONDS).until(() -> !producer.isConnected());
            bufferFuture = new CompletableFuture();
            bufferFuture.complete(null);
            Whitebox.setInternalState((Object)topicTransactionBuffer, (String)"transactionBufferFuture", bufferFuture);
            Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Producer)producer).isConnected());
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

