/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.pendingack;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PendingAckPersistentTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(PendingAckPersistentTest.class);
    private static final String PENDING_ACK_REPLAY_TOPIC = "tnx/ns1/pending-ack-replay";
    private static final int NUM_PARTITIONS = 16;

    @BeforeMethod
    public void setup() throws Exception {
        this.setUpBase(1, 16, PENDING_ACK_REPLAY_TOPIC, 0);
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void individualPendingAckReplayTest() throws Exception {
        int messageCount = 1000;
        String subName = "individual-test";
        Producer producer = this.pulsarClient.newProducer().topic(PENDING_ACK_REPLAY_TOPIC).enableBatching(true).batchingMaxMessages(200).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{PENDING_ACK_REPLAY_TOPIC}).subscriptionName(subName).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
            try {
                int i;
                Transaction abortTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                ArrayList<MessageId> pendingAckMessageIds = new ArrayList<MessageId>();
                ArrayList<MessageId> normalAckMessageIds = new ArrayList<MessageId>();
                for (int i2 = 0; i2 < messageCount; ++i2) {
                    producer.send((Object)"Hello Pulsar!".getBytes());
                    Message message = consumer.receive();
                    if (i2 % 2 == 0) {
                        consumer.acknowledgeAsync(message.getMessageId(), abortTxn).get();
                        pendingAckMessageIds.add(message.getMessageId());
                        continue;
                    }
                    normalAckMessageIds.add(message.getMessageId());
                }
                this.admin.topics().unload(PENDING_ACK_REPLAY_TOPIC);
                Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
                Transaction commitTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                for (i = 0; i < pendingAckMessageIds.size(); ++i) {
                    try {
                        consumer.acknowledgeAsync((MessageId)pendingAckMessageIds.get(i), txn).get();
                        Assert.fail();
                        continue;
                    }
                    catch (ExecutionException e) {
                        Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
                    }
                }
                for (i = 0; i < normalAckMessageIds.size(); ++i) {
                    consumer.acknowledgeAsync((MessageId)normalAckMessageIds.get(i), commitTxn).get();
                }
                txn.abort().get();
                commitTxn.commit().get();
                abortTxn.abort().get();
                this.admin.topics().unload(PENDING_ACK_REPLAY_TOPIC);
                Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
                abortTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                commitTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                for (i = 0; i < normalAckMessageIds.size(); ++i) {
                    try {
                        consumer.acknowledgeAsync((MessageId)normalAckMessageIds.get(i), abortTxn).get();
                        Assert.fail();
                        continue;
                    }
                    catch (ExecutionException e) {
                        Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
                    }
                }
                for (i = 0; i < pendingAckMessageIds.size(); ++i) {
                    consumer.acknowledgeAsync((MessageId)pendingAckMessageIds.get(i), commitTxn).get();
                }
                abortTxn.abort().get();
                commitTxn.commit().get();
                PersistentTopic topic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)PENDING_ACK_REPLAY_TOPIC).toString(), false).get()).get();
                Field field = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                field.setAccessible(true);
                PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)field.get(topic.getSubscription(subName));
                field = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
                field.setAccessible(true);
                CompletableFuture pendingAckStoreCompletableFuture = (CompletableFuture)field.get(pendingAckHandle);
                pendingAckStoreCompletableFuture.get();
                field = MLPendingAckStore.class.getDeclaredField("cursor");
                field.setAccessible(true);
                ManagedCursor managedCursor = (ManagedCursor)field.get(pendingAckStoreCompletableFuture.get());
                Awaitility.await().until(() -> ((PositionImpl)managedCursor.getMarkDeletedPosition()).compareTo((PositionImpl)managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void cumulativePendingAckReplayTest() throws Exception {
        int messageCount = 1000;
        this.getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag((long)(4 * messageCount + 2));
        this.getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(10.0);
        String subName = "cumulative-test";
        Producer producer = this.pulsarClient.newProducer().topic(PENDING_ACK_REPLAY_TOPIC).enableBatching(true).batchingMaxMessages(200).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{PENDING_ACK_REPLAY_TOPIC}).subscriptionName(subName).subscriptionType(SubscriptionType.Failover).enableBatchIndexAcknowledgment(true).subscribe();
            try {
                int i;
                int i2;
                Transaction abortTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                ArrayList<MessageId> pendingAckMessageIds = new ArrayList<MessageId>();
                for (i2 = 0; i2 < messageCount; ++i2) {
                    producer.send((Object)"Hello Pulsar!".getBytes());
                }
                for (i2 = 0; i2 < messageCount; ++i2) {
                    Message message = consumer.receive();
                    pendingAckMessageIds.add(message.getMessageId());
                    consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get();
                }
                this.admin.topics().unload(PENDING_ACK_REPLAY_TOPIC);
                Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
                for (int i3 = 0; i3 < pendingAckMessageIds.size(); ++i3) {
                    try {
                        consumer.acknowledgeCumulativeAsync((MessageId)pendingAckMessageIds.get(i3), txn).get();
                        Assert.fail();
                        continue;
                    }
                    catch (ExecutionException e) {
                        Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
                    }
                }
                Transaction commitTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                abortTxn.abort().get();
                for (i = 0; i < pendingAckMessageIds.size(); ++i) {
                    consumer.acknowledgeCumulativeAsync((MessageId)pendingAckMessageIds.get(i), commitTxn).get();
                }
                commitTxn.commit().get();
                this.admin.topics().unload(PENDING_ACK_REPLAY_TOPIC);
                Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
                for (i = 0; i < pendingAckMessageIds.size(); ++i) {
                    try {
                        consumer.acknowledgeCumulativeAsync((MessageId)pendingAckMessageIds.get(i), txn).get();
                        Assert.fail();
                        continue;
                    }
                    catch (ExecutionException e) {
                        Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
                    }
                }
                PersistentTopic topic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)PENDING_ACK_REPLAY_TOPIC).toString(), false).get()).get();
                Field field = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                field.setAccessible(true);
                PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)field.get(topic.getSubscription(subName));
                field = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
                field.setAccessible(true);
                CompletableFuture pendingAckStoreCompletableFuture = (CompletableFuture)field.get(pendingAckHandle);
                pendingAckStoreCompletableFuture.get();
                field = MLPendingAckStore.class.getDeclaredField("cursor");
                field.setAccessible(true);
                ManagedCursor managedCursor = (ManagedCursor)field.get(pendingAckStoreCompletableFuture.get());
                Awaitility.await().until(() -> ((PositionImpl)managedCursor.getMarkDeletedPosition()).compareTo((PositionImpl)managedCursor.getManagedLedger().getLastConfirmedEntry()) == 0);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    private void testDeleteSubThenDeletePendingAckManagedLedger() throws Exception {
        String subName = "test-delete";
        String topic = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)NamespaceName.get((String)"tnx/ns1"), (String)"test-delete").toString();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName).subscriptionType(SubscriptionType.Failover).enableBatchIndexAcknowledgment(true).subscribe();
        try {
            consumer.close();
            this.admin.topics().deleteSubscription(topic, subName);
            List topics = this.admin.namespaces().getTopics("tnx/ns1");
            TopicStats topicStats = this.admin.topics().getStats(topic, false);
            Assert.assertFalse((boolean)topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix((String)topic, (String)subName)));
            Assert.assertTrue((boolean)topics.contains(topic));
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception {
        String subName1 = "test-delete";
        String subName2 = "test-delete";
        String topic = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)NamespaceName.get((String)"tnx/ns1"), (String)"test-delete").toString();
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName1).subscriptionType(SubscriptionType.Failover).enableBatchIndexAcknowledgment(true).subscribe();
        try {
            consumer1.close();
            Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName2).subscriptionType(SubscriptionType.Failover).enableBatchIndexAcknowledgment(true).subscribe();
            try {
                consumer2.close();
                this.admin.topics().delete(topic);
                List topics = this.admin.namespaces().getTopics("tnx/ns1");
                Assert.assertFalse((boolean)topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix((String)topic, (String)subName1)));
                Assert.assertFalse((boolean)topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix((String)topic, (String)subName2)));
                Assert.assertFalse((boolean)topics.contains(topic));
            }
            finally {
                if (Collections.singletonList(consumer2).get(0) != null) {
                    consumer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer1).get(0) != null) {
                consumer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception {
        this.getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(5L);
        this.getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5.0);
        String subName = "test-log-delete";
        String topic = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)NamespaceName.get((String)"tnx/ns1"), (String)"test-log-delete").toString();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            try {
                int i;
                for (int i2 = 0; i2 < 20; ++i2) {
                    producer.newMessage().send();
                }
                Message message = consumer.receive(5, TimeUnit.SECONDS);
                Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
                PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false).get()).get();
                PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
                Field field = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                field.setAccessible(true);
                PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)field.get(persistentSubscription);
                Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
                field1.setAccessible(true);
                PendingAckStore pendingAckStore = (PendingAckStore)((CompletableFuture)field1.get(pendingAckHandle)).get();
                Field field3 = MLPendingAckStore.class.getDeclaredField("pendingAckLogIndex");
                Field field4 = MLPendingAckStore.class.getDeclaredField("maxIndexLag");
                field3.setAccessible(true);
                field4.setAccessible(true);
                ConcurrentSkipListMap pendingAckLogIndex = (ConcurrentSkipListMap)field3.get(pendingAckStore);
                long maxIndexLag = (Long)field4.get(pendingAckStore);
                Assert.assertEquals((int)pendingAckLogIndex.size(), (int)0);
                Assert.assertEquals((long)maxIndexLag, (long)5L);
                transaction.commit().get();
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)persistentSubscription.getCursor().getPersistentMarkDeletedPosition().getEntryId(), (long)((MessageIdImpl)message.getMessageId()).getEntryId()));
                Transaction transaction1 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                Message message0 = null;
                for (i = 0; i < 4; ++i) {
                    message0 = consumer.receive(5, TimeUnit.SECONDS);
                    consumer.acknowledgeAsync(message0.getMessageId(), transaction1).get();
                }
                Assert.assertEquals((int)pendingAckLogIndex.size(), (int)1);
                maxIndexLag = (Long)field4.get(pendingAckStore);
                Assert.assertEquals((long)maxIndexLag, (long)5L);
                for (i = 0; i < 9; ++i) {
                    message0 = consumer.receive(5, TimeUnit.SECONDS);
                    consumer.acknowledgeAsync(message0.getMessageId(), transaction1).get();
                }
                Assert.assertEquals((int)pendingAckLogIndex.size(), (int)2);
                maxIndexLag = (Long)field4.get(pendingAckStore);
                Assert.assertEquals((long)maxIndexLag, (long)10L);
                transaction1.commit().get();
                Message message1 = message0;
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)persistentSubscription.getCursor().getPersistentMarkDeletedPosition().getEntryId(), (long)((MessageIdImpl)message1.getMessageId()).getEntryId()));
                Transaction transaction2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                Message message2 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get();
                Assert.assertEquals((int)pendingAckLogIndex.size(), (int)0);
                maxIndexLag = (Long)field4.get(pendingAckStore);
                Assert.assertEquals((long)maxIndexLag, (long)5L);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
        String topic = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)NamespaceName.get((String)"tnx/ns1"), (String)"test").toString();
        String subName = "subName";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName).subscriptionType(SubscriptionType.Failover).enableBatchIndexAcknowledgment(true).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
            try {
                for (int i = 0; i < 5; ++i) {
                    producer.newMessage().send();
                }
                Transaction transaction1 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                Message message1 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message1.getMessageId(), transaction1);
                transaction1.commit().get();
                Transaction transaction2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                while (transaction1.getTxnID().getMostSigBits() != transaction2.getTxnID().getMostSigBits()) {
                    transaction2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                }
                Transaction transaction3 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                while (transaction1.getTxnID().getMostSigBits() != transaction3.getTxnID().getMostSigBits()) {
                    transaction3 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                }
                Message message3 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message3.getMessageId(), transaction2);
                transaction2.commit().get();
                Message message2 = consumer.receive(5, TimeUnit.SECONDS);
                Field field = TransactionImpl.class.getDeclaredField("state");
                field.setAccessible(true);
                field.set(transaction1, TransactionImpl.State.OPEN);
                consumer.acknowledgeAsync(message2.getMessageId(), transaction1).get();
                Message message4 = consumer.receive(5, TimeUnit.SECONDS);
                field.set(transaction2, TransactionImpl.State.OPEN);
                consumer.acknowledgeAsync(message4.getMessageId(), transaction2).get();
                Message message5 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message5.getMessageId(), transaction3);
                transaction3.commit().get();
                PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false).get()).get();
                PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
                Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                field1.setAccessible(true);
                PendingAckHandleImpl oldPendingAckHandle = (PendingAckHandleImpl)field1.get(persistentSubscription);
                Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
                field2.setAccessible(true);
                LinkedMap oldIndividualAckOfTransaction = (LinkedMap)field2.get(oldPendingAckHandle);
                Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)oldIndividualAckOfTransaction.size(), (int)0));
                PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);
                Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore", new Class[0]);
                method.setAccessible(true);
                method.invoke((Object)pendingAckHandle, new Object[0]);
                Field field3 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
                field3.setAccessible(true);
                Awaitility.await().until(() -> {
                    CompletableFuture completableFuture = (CompletableFuture)field3.get(pendingAckHandle);
                    completableFuture.get();
                    return true;
                });
                LinkedMap individualAckOfTransaction = (LinkedMap)field2.get(pendingAckHandle);
                Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)transaction1.getTxnID()));
                Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)transaction2.getTxnID()));
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionConflictExceptionWhenAckBatchMessage() throws Exception {
        String topic = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)NamespaceName.get((String)"tnx/ns1"), (String)"test").toString();
        String subscriptionName = "my-subscription-batch";
        ((ManagedLedgerConfig)((PulsarService)this.pulsarServiceList.get(0)).getBrokerService().getManagedLedgerConfig(TopicName.get((String)topic)).get()).setDeletionAtBatchIndexLevelEnabled(true);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).topic(topic).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(subscriptionName).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Exclusive).isAckReceiptEnabled(true).topic(new String[]{topic}).subscribe();
            try {
                ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
                ArrayList<CompletableFuture> futureMessageIds = new ArrayList<CompletableFuture>();
                ArrayList<String> messages = new ArrayList<String>();
                for (int i = 0; i < 3; ++i) {
                    String message = "my-message-" + i;
                    messages.add(message);
                    CompletableFuture messageIdCompletableFuture = producer.sendAsync((Object)message);
                    futureMessageIds.add(messageIdCompletableFuture);
                }
                for (CompletableFuture futureMessageId : futureMessageIds) {
                    MessageId messageId = (MessageId)futureMessageId.get();
                    messageIds.add(messageId);
                }
                Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.DAYS).build().get();
                Message message1 = consumer.receive();
                Message message2 = consumer.receive();
                BatchMessageIdImpl messageId = (BatchMessageIdImpl)message2.getMessageId();
                consumer.acknowledgeAsync((MessageId)messageId, transaction).get();
                Transaction transaction2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.DAYS).build().get();
                transaction.commit().get();
                try {
                    consumer.acknowledgeAsync((MessageId)messageId, transaction2).get();
                    Assert.fail();
                }
                catch (ExecutionException e) {
                    Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetSubPatternTopicFilterTxnInternalTopic() throws Exception {
        String topic = TopicName.get((String)TopicDomain.persistent.toString(), (NamespaceName)NamespaceName.get((String)"tnx/ns1"), (String)"testGetSubPatternTopicFilterTxnInternalTopic").toString();
        int partition = 3;
        this.admin.topics().createPartitionedTopic(topic, partition);
        String subscriptionName = "sub";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();
        try {
            int i;
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).topic(new String[]{topic}).subscribe();
            for (i = 0; i < partition; ++i) {
                producer.send((Object)"test");
            }
            for (i = 0; i < partition; ++i) {
                Transaction transaction = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction);
                transaction.commit().get();
            }
            consumer.close();
            Consumer patternConsumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("patternSub").subscriptionType(SubscriptionType.Shared).topicsPattern("persistent://tnx/ns1/.*").subscribe();
            try {
                int i2;
                for (i2 = 0; i2 < partition; ++i2) {
                    producer.send((Object)("test" + i2));
                }
                for (i2 = 0; i2 < partition; ++i2) {
                    patternConsumer.acknowledgeAsync(patternConsumer.receive().getMessageId());
                }
                patternConsumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(patternConsumer).get(0) != null) {
                    patternConsumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

