/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TransactionLowWaterMarkTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionLowWaterMarkTest.class);
    private static final String TOPIC = "persistent://tnx/ns1/test-topic";

    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        this.setUpBase(1, 16, TOPIC, 0);
        Map stores = this.getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
        Awaitility.await().until(() -> {
            if (stores.size() == 16) {
                for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
                    if (((MLTransactionMetadataStore)stores.get(transactionCoordinatorID)).getState() == TransactionMetadataStoreState.State.Ready) continue;
                    return false;
                }
                return true;
            }
            return false;
        });
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTransactionBufferLowWaterMark() throws Exception {
        Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer producer = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
        String TEST1 = "test1";
        String TEST2 = "test2";
        String TEST3 = "test3";
        producer.newMessage(txn).value((Object)"test1".getBytes()).send();
        txn.commit().get();
        Message message = consumer.receive(2, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(message.getData()), (String)"test1");
        message = consumer.receive(2, TimeUnit.SECONDS);
        Assert.assertNull((Object)message);
        Field field = TransactionImpl.class.getDeclaredField("state");
        field.setAccessible(true);
        field.set(txn, TransactionImpl.State.OPEN);
        producer.newMessage(txn).value((Object)"test2".getBytes()).send();
        try {
            txn.commit().get();
            Assert.fail((String)"The commit operation should be failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException));
        }
        PartitionedTopicMetadata partitionedTopicMetadata = (PartitionedTopicMetadata)((PulsarClientImpl)this.pulsarClient).getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
        Transaction lowWaterMarkTxn = null;
        for (int i = 0; i < partitionedTopicMetadata.partitions && ((TransactionImpl)(lowWaterMarkTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get())).getTxnIdMostBits() != ((TransactionImpl)txn).getTxnIdMostBits(); ++i) {
        }
        if (lowWaterMarkTxn != null && ((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits() == ((TransactionImpl)txn).getTxnIdMostBits()) {
            producer.newMessage(lowWaterMarkTxn).value((Object)"test3".getBytes()).send();
            message = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)message);
            lowWaterMarkTxn.commit().get();
            message = consumer.receive();
            Assert.assertEquals((String)new String(message.getData()), (String)"test3");
        } else {
            Assert.fail();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingAckLowWaterMark() throws Exception {
        String subName = "test";
        Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer producer = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
            try {
                String TEST1 = "test1";
                String TEST2 = "test2";
                String TEST3 = "test3";
                producer.send((Object)"test1".getBytes());
                producer.send((Object)"test2".getBytes());
                producer.send((Object)"test3".getBytes());
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals((String)new String(message.getData()), (String)"test1");
                consumer.acknowledgeAsync(message.getMessageId(), txn).get();
                LinkedMap individualAckOfTransaction = null;
                for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                    Optional topic;
                    Field field = BrokerService.class.getDeclaredField("topics");
                    field.setAccessible(true);
                    ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                    CompletableFuture completableFuture = (CompletableFuture)topics.get((Object)TOPIC);
                    if (completableFuture == null || !(topic = (Optional)completableFuture.get()).isPresent()) continue;
                    PersistentSubscription persistentSubscription = (PersistentSubscription)((Topic)topic.get()).getSubscription(subName);
                    field = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                    field.setAccessible(true);
                    PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)field.get(persistentSubscription);
                    field = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
                    field.setAccessible(true);
                    individualAckOfTransaction = (LinkedMap)field.get(pendingAckHandle);
                }
                Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                txn.commit().get();
                Field field = TransactionImpl.class.getDeclaredField("state");
                field.setAccessible(true);
                field.set(txn, TransactionImpl.State.OPEN);
                Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                message = consumer.receive();
                Assert.assertEquals((String)new String(message.getData()), (String)"test2");
                consumer.acknowledgeAsync(message.getMessageId(), txn).get();
                Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                PartitionedTopicMetadata partitionedTopicMetadata = (PartitionedTopicMetadata)((PulsarClientImpl)this.pulsarClient).getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
                Transaction lowWaterMarkTxn = null;
                for (int i = 0; i < partitionedTopicMetadata.partitions && ((TransactionImpl)(lowWaterMarkTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get())).getTxnIdMostBits() != ((TransactionImpl)txn).getTxnIdMostBits(); ++i) {
                }
                if (lowWaterMarkTxn != null && ((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits() == ((TransactionImpl)txn).getTxnIdMostBits()) {
                    producer.newMessage(lowWaterMarkTxn).value((Object)"test3".getBytes()).send();
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertEquals((String)new String(message.getData()), (String)"test3");
                    consumer.acknowledgeAsync(message.getMessageId(), lowWaterMarkTxn).get();
                    Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                    Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits(), ((TransactionImpl)lowWaterMarkTxn).getTxnIdLeastBits())));
                    lowWaterMarkTxn.commit().get();
                    Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                    Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits(), ((TransactionImpl)lowWaterMarkTxn).getTxnIdLeastBits())));
                } else {
                    Assert.fail();
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTBLowWaterMarkEndToEnd() throws Exception {
        Transaction txn1 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
        Transaction txn2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
        while (txn2.getTxnID().getMostSigBits() != txn1.getTxnID().getMostSigBits()) {
            txn2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
        }
        Producer producer = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            producer.newMessage(txn1).send();
            producer.newMessage(txn2).send();
            txn1.commit().get();
            txn2.commit().get();
            Field field = TransactionImpl.class.getDeclaredField("state");
            field.setAccessible(true);
            field.set(txn1, TransactionImpl.State.OPEN);
            AtomicLong pendingWriteOps = (AtomicLong)Whitebox.getInternalState(((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)TOPIC).toString(), false).get()).get(), (String)"pendingWriteOps");
            try {
                producer.newMessage(txn1).send();
                Assert.fail();
            }
            catch (PulsarClientException.NotAllowedException notAllowedException) {
                // empty catch block
            }
            Assert.assertEquals((long)pendingWriteOps.get(), (long)0L);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLowWaterMarkForDifferentTC() throws Exception {
        String subName = "sub";
        Producer producer = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName(subName).subscribe();
            try {
                Transaction txn1 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                Transaction txn2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                while (txn2.getTxnID().getMostSigBits() == txn1.getTxnID().getMostSigBits()) {
                    txn2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                }
                Transaction txn3 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                while (txn3.getTxnID().getMostSigBits() != txn2.getTxnID().getMostSigBits()) {
                    txn3 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                }
                Transaction txn4 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                while (txn4.getTxnID().getMostSigBits() != txn1.getTxnID().getMostSigBits()) {
                    txn4 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                }
                for (int i = 0; i < 10; ++i) {
                    producer.newMessage().send();
                }
                producer.newMessage(txn1).send();
                producer.newMessage(txn2).send();
                producer.newMessage(txn3).send();
                producer.newMessage(txn4).send();
                Message message1 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message1.getMessageId(), txn1);
                Message message2 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message2.getMessageId(), txn2);
                Message message3 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message3.getMessageId(), txn3);
                Message message4 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message4.getMessageId(), txn4);
                txn1.commit().get();
                txn2.commit().get();
                Field field = TransactionImpl.class.getDeclaredField("state");
                field.setAccessible(true);
                field.set(txn1, TransactionImpl.State.OPEN);
                field.set(txn2, TransactionImpl.State.OPEN);
                producer.newMessage(txn1).send();
                producer.newMessage(txn2).send();
                Message message5 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message5.getMessageId(), txn1);
                Message message6 = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message6.getMessageId(), txn2);
                txn3.commit().get();
                TxnID txnID1 = txn1.getTxnID();
                TxnID txnID2 = txn2.getTxnID();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertTrue((boolean)this.checkTxnIsOngoingInTP(txnID1, subName));
                    Assert.assertTrue((boolean)this.checkTxnIsOngoingInTP(txnID2, subName));
                    Assert.assertTrue((boolean)this.checkTxnIsOngoingInTB(txnID1));
                    Assert.assertTrue((boolean)this.checkTxnIsOngoingInTB(txnID2));
                });
                txn4.commit().get();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertFalse((boolean)this.checkTxnIsOngoingInTP(txnID1, subName));
                    Assert.assertFalse((boolean)this.checkTxnIsOngoingInTP(txnID2, subName));
                    Assert.assertFalse((boolean)this.checkTxnIsOngoingInTB(txnID1));
                    Assert.assertFalse((boolean)this.checkTxnIsOngoingInTB(txnID2));
                });
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private boolean checkTxnIsOngoingInTP(TxnID txnID, String subName) throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)TOPIC).toString(), false).get()).get();
        PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
        Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
        field1.setAccessible(true);
        PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)field1.get(persistentSubscription);
        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
        field2.setAccessible(true);
        LinkedMap individualAckOfTransaction = (LinkedMap)field2.get(pendingAckHandle);
        return individualAckOfTransaction.containsKey((Object)txnID);
    }

    private boolean checkTxnIsOngoingInTB(TxnID txnID) throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get((String)TOPIC).toString(), false).get()).get();
        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)persistentTopic.getTransactionBuffer();
        Field field3 = TopicTransactionBuffer.class.getDeclaredField("ongoingTxns");
        field3.setAccessible(true);
        LinkedMap ongoingTxns = (LinkedMap)field3.get(topicTransactionBuffer);
        return ongoingTxns.containsKey((Object)txnID);
    }
}

