/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-compaction"})
public class CompactedTopicTest
extends MockedPulsarServiceBaseTest {
    private final Random r = new Random(0L);

    @DataProvider(name="batchEnabledProvider")
    public Object[][] batchEnabledProvider() {
        return new Object[][]{{Boolean.FALSE}, {Boolean.TRUE}};
    }

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> buildCompactedLedger(BookKeeper bk, int count) throws Exception {
        LedgerHandle lh = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        ArrayList positions = new ArrayList();
        ArrayList idsInGaps = new ArrayList();
        AtomicLong ledgerIds = new AtomicLong(10L);
        AtomicLong entryIds = new AtomicLong(0L);
        CompletableFuture.allOf((CompletableFuture[])IntStream.range(0, count).mapToObj(i -> {
            long delta;
            ArrayList<MessageIdData> idsInGap = new ArrayList<MessageIdData>();
            if (this.r.nextInt(10) == 1) {
                delta = this.r.nextInt(10) + 1;
                idsInGap.add(new MessageIdData().setLedgerId(ledgerIds.get()).setEntryId(entryIds.get() + 1L));
                ledgerIds.addAndGet(delta);
                entryIds.set(0L);
            }
            if ((delta = (long)this.r.nextInt(5)) != 0L) {
                idsInGap.add(new MessageIdData().setLedgerId(ledgerIds.get()).setEntryId(entryIds.get() + 1L));
            }
            MessageIdData id = new MessageIdData().setLedgerId(ledgerIds.get()).setEntryId(entryIds.addAndGet(delta + 1L));
            RawMessageImpl m = new RawMessageImpl(id, Unpooled.EMPTY_BUFFER);
            try {
                CompletableFuture f = new CompletableFuture();
                ByteBuf buffer = m.serialize();
                lh.asyncAddEntry(buffer, (rc, ledger, eid, ctx) -> {
                    if (rc != 0) {
                        f.completeExceptionally(BKException.create((int)rc));
                    } else {
                        positions.add(Pair.of((Object)id, (Object)eid));
                        idsInGap.forEach(gid -> idsInGaps.add(Pair.of((Object)gid, (Object)eid)));
                        f.complete(null);
                    }
                }, null);
                CompletableFuture completableFuture = f;
                return completableFuture;
            }
            finally {
                if (Collections.singletonList(m).get(0) != null) {
                    m.close();
                }
            }
        }).toArray(CompletableFuture[]::new)).get();
        lh.close();
        return Triple.of((Object)lh.getId(), positions, idsInGaps);
    }

    @Test
    public void testEntryLookup() throws Exception {
        PositionImpl pos;
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
        Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData = this.buildCompactedLedger(bk, 500);
        List positions = (List)compactedLedgerData.getMiddle();
        List idsInGaps = (List)compactedLedgerData.getRight();
        LedgerHandle lh = bk.openLedger(((Long)compactedLedgerData.getLeft()).longValue(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        long lastEntryId = lh.getLastAddConfirmed();
        AsyncLoadingCache cache = CompactedTopicImpl.createCache((LedgerHandle)lh, (long)50L);
        MessageIdData firstPositionId = (MessageIdData)((Pair)positions.get(0)).getLeft();
        Pair lastPosition = (Pair)positions.get(positions.size() - 1);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(0L, 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)0L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(Long.MAX_VALUE, 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)-4276948922L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(firstPositionId.getLedgerId(), 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)0L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(((MessageIdData)lastPosition.getLeft()).getLedgerId(), ((MessageIdData)lastPosition.getLeft()).getEntryId() + 1L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)-4276948922L);
        Collections.shuffle(positions, this.r);
        Collections.shuffle(idsInGaps, this.r);
        for (Pair p : positions) {
            pos = new PositionImpl(((MessageIdData)p.getLeft()).getLedgerId(), ((MessageIdData)p.getLeft()).getEntryId());
            Long got = (Long)CompactedTopicImpl.findStartPoint((PositionImpl)pos, (long)lastEntryId, (AsyncLoadingCache)cache).get();
            Assert.assertEquals((Object)got, (Object)p.getRight());
        }
        for (Pair gap : idsInGaps) {
            pos = new PositionImpl(((MessageIdData)gap.getLeft()).getLedgerId(), ((MessageIdData)gap.getLeft()).getEntryId());
            Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)pos, (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)gap.getRight());
        }
    }

    @Test
    public void testCleanupOldCompactedTopicLedger() throws Exception {
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
        LedgerHandle oldCompactedLedger = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        oldCompactedLedger.close();
        LedgerHandle newCompactedLedger = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        newCompactedLedger.close();
        CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
        compactedTopic.newCompactedLedger((Position)new PositionImpl(1L, 2L), oldCompactedLedger.getId()).get();
        bk.openLedger(oldCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
        bk.openLedger(newCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
        PositionImpl newHorizon = new PositionImpl(1L, 3L);
        compactedTopic.newCompactedLedger((Position)newHorizon, newCompactedLedger.getId()).get();
        bk.openLedger(oldCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
        Assert.assertTrue((boolean)compactedTopic.getCompactedTopicContext().isPresent());
        Assert.assertEquals((long)((CompactedTopicContext)compactedTopic.getCompactedTopicContext().get()).getLedger().getId(), (long)newCompactedLedger.getId());
        Assert.assertTrue((boolean)compactedTopic.getCompactionHorizon().isPresent());
        Assert.assertEquals(compactedTopic.getCompactionHorizon().get(), (Object)newHorizon);
        compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();
        try {
            bk.openLedger(oldCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
            Assert.fail((String)"Should have failed to open old ledger");
        }
        catch (BKException.BKNoSuchLedgerExistsException | BKException.BKNoSuchLedgerExistsOnMetadataServerException throwable) {
            // empty catch block
        }
        bk.openLedger(newCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
    }

    @Test(dataProvider="batchEnabledProvider")
    public void testCompactWithEmptyMessage(boolean batchEnabled) throws Exception {
        String key = "1";
        byte[] msgBytes = "".getBytes();
        String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 1);
        int messages = 10;
        ProducerBuilder builder = this.pulsarClient.newProducer().topic(topic);
        if (!batchEnabled) {
            builder.enableBatching(false);
        } else {
            builder.batchingMaxMessages(5);
        }
        Producer producer = builder.create();
        ArrayList<CompletableFuture> list = new ArrayList<CompletableFuture>(10);
        for (int i = 0; i < 10; ++i) {
            list.add(producer.newMessage().keyBytes("1".getBytes(Charset.defaultCharset())).value((Object)msgBytes).sendAsync());
        }
        FutureUtil.waitForAll(list).get();
        this.admin.topics().triggerCompaction(topic);
        boolean succeed = CompactedTopicTest.retryStrategically(test -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals((Object)this.admin.topics().compactionStatus((String)topic).status);
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L);
        Assert.assertTrue((boolean)succeed);
        list.clear();
        for (int i = 0; i < 10; ++i) {
            list.add(producer.newMessage().key("1").value((Object)msgBytes).sendAsync());
        }
        FutureUtil.waitForAll(list).get();
        this.admin.topics().triggerCompaction(topic);
        succeed = CompactedTopicTest.retryStrategically(test -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals((Object)this.admin.topics().compactionStatus((String)topic).status);
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L);
        Assert.assertTrue((boolean)succeed);
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testReadMessageFromCompactedLedger() throws Exception {
        String key = "1";
        String msg = "test compaction msg";
        String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 1);
        int numMessages = 10;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
        for (int i = 0; i < 10; ++i) {
            producer.newMessage().key("1").value((Object)msg).send();
        }
        this.admin.topics().triggerCompaction(topic);
        boolean succeed = CompactedTopicTest.retryStrategically(test -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals((Object)this.admin.topics().compactionStatus((String)topic).status);
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L);
        Assert.assertTrue((boolean)succeed);
        String newKey = "2";
        String newMsg = "test compaction msg v2";
        for (int i = 0; i < 10; ++i) {
            producer.newMessage().key("2").value((Object)newMsg).send();
        }
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).subscriptionName("test").readCompacted(true).startMessageId(MessageId.earliest).create();
        int compactedMsgCount = 0;
        int nonCompactedMsgCount = 0;
        while (reader.hasMessageAvailable()) {
            Message message = reader.readNext();
            if ("1".equals(message.getKey()) && msg.equals(message.getValue())) {
                ++compactedMsgCount;
                continue;
            }
            if (!"2".equals(message.getKey()) || !newMsg.equals(message.getValue())) continue;
            ++nonCompactedMsgCount;
        }
        Assert.assertEquals((int)compactedMsgCount, (int)1);
        Assert.assertEquals((int)nonCompactedMsgCount, (int)10);
    }

    @Test
    public void testLastMessageIdForCompactedLedger() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testLastMessageIdForCompactedLedger-" + UUID.randomUUID();
        String key = "1";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
        int numMessages = 10;
        String msg = "test compaction msg";
        for (int i = 0; i < 10; ++i) {
            producer.newMessage().key("1").value((Object)"test compaction msg").send();
        }
        this.admin.topics().triggerCompaction(topic);
        boolean succeed = CompactedTopicTest.retryStrategically(test -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals((Object)this.admin.topics().compactionStatus((String)topic).status);
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L);
        Assert.assertTrue((boolean)succeed);
        PersistentTopicInternalStats stats0 = this.admin.topics().getInternalStats(topic);
        this.admin.topics().unload(topic);
        PersistentTopicInternalStats stats1 = this.admin.topics().getInternalStats(topic);
        Assert.assertTrue((stats0.currentLedgerSize != stats1.currentLedgerSize ? 1 : 0) != 0);
        Optional topicRef = (Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get();
        Assert.assertTrue((boolean)topicRef.isPresent());
        PersistentTopic persistentTopic = (PersistentTopic)topicRef.get();
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
        managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((long)managedLedger.getCurrentLedgerEntries(), (long)0L);
            Assert.assertTrue((managedLedger.getLastConfirmedEntry().getEntryId() != -1L ? 1 : 0) != 0);
            Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)1);
        });
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).subscriptionName("test").readCompacted(true).startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        Message received = reader.readNext();
        Assert.assertEquals((String)"test compaction msg", (String)((String)received.getValue()));
        MessageId messageId = ((ReaderImpl)reader).getConsumer().getLastMessageId();
        Assert.assertEquals((Object)messageId, (Object)received.getMessageId());
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.close();
        this.admin.topics().unload(topic);
        PersistentTopicInternalStats stats2 = this.admin.topics().getInternalStats(topic);
        Assert.assertTrue((boolean)stats2.lastConfirmedEntry.endsWith(":-1"));
        Assert.assertTrue((stats2.compactedLedger.ledgerId > 0L ? 1 : 0) != 0);
        reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).subscriptionName("test").readCompacted(true).startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        reader.readNext();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
    }

    @Test
    public void testDoNotLossTheLastCompactedLedgerData() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testDoNotLossTheLastCompactedLedgerData-" + UUID.randomUUID();
        int numMessages = 2000;
        int keys = 200;
        String msg = "Test";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).blockIfQueueFull(true).maxPendingMessages(2000).enableBatching(false).create();
        CompletableFuture lastMessage = null;
        for (int i = 0; i < 2000; ++i) {
            lastMessage = producer.newMessage().key(i % 200 + "").value((Object)"Test").sendAsync();
        }
        producer.flush();
        lastMessage.join();
        this.admin.topics().triggerCompaction(topic);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
            Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
            Assert.assertEquals((long)stats.compactedLedger.entries, (long)200L);
            Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
        });
        this.admin.topics().unload(topic);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
            Assert.assertEquals((int)stats.ledgers.size(), (int)1);
            Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
        });
        this.admin.topics().unload(topic);
        producer.newMessage().key("200").value((Object)"Test").send();
        this.admin.topics().triggerCompaction(topic);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
            Assert.assertEquals((long)stats.compactedLedger.entries, (long)201L);
        });
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest).readCompacted(true).create();
        int received = 0;
        while (reader.hasMessageAvailable()) {
            reader.readNext();
            ++received;
        }
        Assert.assertEquals((int)received, (int)201);
        reader.close();
        producer.close();
    }

    @Test
    public void testReadCompactedDataWhenLedgerRolloverKickIn() throws Exception {
        int i;
        String topic = "persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-" + UUID.randomUUID();
        int numMessages = 2000;
        int keys = 200;
        String msg = "Test";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).blockIfQueueFull(true).maxPendingMessages(2000).enableBatching(false).create();
        CompletableFuture lastMessage = null;
        for (i = 0; i < 2000; ++i) {
            lastMessage = producer.newMessage().key(i % 200 + "").value((Object)"Test").sendAsync();
        }
        producer.flush();
        lastMessage.join();
        this.admin.topics().triggerCompaction(topic);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
            Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
            Assert.assertEquals((long)stats.compactedLedger.entries, (long)200L);
            Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
        });
        for (i = 0; i < 2000; ++i) {
            lastMessage = producer.newMessage().key(i % 200 + 200 + "").value((Object)"Test").sendAsync();
        }
        producer.flush();
        lastMessage.join();
        this.admin.topics().unload(topic);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)this.admin.topics().getInternalStats((String)topic).ledgers.size(), (int)2));
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest).readCompacted(true).receiverQueueSize(10).create();
        for (int i2 = 0; i2 < 2000; ++i2) {
            lastMessage = producer.newMessage().key(i2 % 200 + 400 + "").value((Object)"Test").sendAsync();
        }
        producer.flush();
        lastMessage.join();
        this.admin.topics().triggerCompaction(topic);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
            Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
            Assert.assertEquals((long)stats.compactedLedger.entries, (long)600L);
            Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
        });
        int received = 0;
        while (reader.hasMessageAvailable()) {
            reader.readNext();
            ++received;
        }
        Assert.assertEquals((int)received, (int)600);
        reader.close();
        producer.close();
    }

    @Test(timeOut=120000L)
    public void testCompactionWithTopicUnloading() throws Exception {
        int i;
        String topic = "persistent://my-property/use/my-ns/testCompactionWithTopicUnloading-" + UUID.randomUUID();
        int numMessages = 2000;
        int keys = 500;
        String msg = "Test";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).blockIfQueueFull(true).maxPendingMessages(2000).enableBatching(false).create();
        CompletableFuture lastMessage = null;
        for (i = 0; i < 2000; ++i) {
            lastMessage = producer.newMessage().key(i % 500 + "").value((Object)"Test").sendAsync();
        }
        producer.flush();
        lastMessage.join();
        this.admin.topics().triggerCompaction(topic);
        Awaitility.await().pollInterval(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
            Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
            Assert.assertEquals((long)stats.compactedLedger.entries, (long)500L);
            Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
        });
        this.admin.topics().unload(topic);
        for (i = 0; i < 2000; ++i) {
            lastMessage = producer.newMessage().key(i % 500 + 500 + "").value((Object)"Test").sendAsync();
        }
        producer.flush();
        lastMessage.join();
        this.admin.topics().triggerCompaction(topic);
        Thread.sleep(100L);
        this.admin.topics().unload(topic);
        this.admin.topics().triggerCompaction(topic);
        Awaitility.await().pollInterval(3L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
            PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
            Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
            Assert.assertEquals((long)stats.compactedLedger.entries, (long)1000L);
            Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
        });
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest).readCompacted(true).receiverQueueSize(10).create();
        int received = 0;
        while (reader.hasMessageAvailable()) {
            reader.readNext();
            ++received;
        }
        Assert.assertEquals((int)received, (int)1000);
        reader.close();
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReader() throws Exception {
        String ns = "my-property/use/my-ns";
        String topic = "persistent://my-property/use/my-ns/t1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        try {
            producer.newMessage().key("k").value((Object)"value".getBytes()).send();
            producer.newMessage().key("k").value(null).send();
            this.pulsar.getCompactor().compact(topic).get();
            Awaitility.await().pollInterval(3L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
                this.admin.topics().unload(topic);
                Thread.sleep(100L);
                Assert.assertTrue((boolean)this.admin.topics().getInternalStats((String)topic).lastConfirmedEntry.endsWith("-1"));
            });
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(topic);
            Assert.assertTrue((boolean)internalStats.lastConfirmedEntry.endsWith("-1"));
            Assert.assertEquals((long)internalStats.compactedLedger.size, (long)0L);
            Reader reader = this.pulsarClient.newReader().topic(topic).startMessageIdInclusive().startMessageId(MessageId.earliest).readCompacted(true).create();
            try {
                Assert.assertFalse((boolean)reader.hasMessageAvailable());
            }
            finally {
                if (Collections.singletonList(reader).get(0) != null) {
                    reader.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHasMessageAvailableWithNullValueMessage() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testHasMessageAvailable-" + UUID.randomUUID();
        int numMessages = 10;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).blockIfQueueFull(true).enableBatching(false).create();
        try {
            int i;
            CompletableFuture lastMessage = null;
            for (i = 0; i < 10; ++i) {
                lastMessage = producer.newMessage().key(i + "").value((Object)String.format("msg [%d]", i)).sendAsync();
            }
            for (i = 5; i < 10; ++i) {
                lastMessage = producer.newMessage().key(i + "").value(null).sendAsync();
            }
            producer.flush();
            lastMessage.join();
            this.admin.topics().triggerCompaction(topic);
            Awaitility.await().untilAsserted(() -> {
                PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
                Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
                Assert.assertEquals((long)stats.compactedLedger.entries, (long)5L);
                Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
                Assert.assertEquals((String)stats.lastConfirmedEntry, (String)((ManagedLedgerInternalStats.CursorStats)stats.cursors.get((Object)"__compaction")).markDeletePosition);
            });
            Reader reader = this.pulsarClient.newReader().topic(topic).startMessageIdInclusive().startMessageId(MessageId.earliest).readCompacted(true).create();
            try {
                for (int i2 = 5; i2 < 10; ++i2) {
                    reader.readNext();
                }
                Assert.assertFalse((boolean)reader.hasMessageAvailable());
                Assert.assertNull((Object)reader.readNext(3, TimeUnit.SECONDS));
            }
            finally {
                if (Collections.singletonList(reader).get(0) != null) {
                    reader.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadCompleteMessagesDuringTopicUnloading() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" + UUID.randomUUID();
        int numMessages = 1000;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).blockIfQueueFull(true).enableBatching(false).create();
        try {
            int i;
            CompletableFuture lastMessage = null;
            for (i = 0; i < 1000; ++i) {
                lastMessage = producer.newMessage().key(i + "").value((Object)String.format("msg [%d]", i)).sendAsync();
            }
            producer.flush();
            lastMessage.join();
            this.admin.topics().triggerCompaction(topic);
            Awaitility.await().untilAsserted(() -> {
                PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
                Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
                Assert.assertEquals((long)stats.compactedLedger.entries, (long)1000L);
                Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
                Assert.assertEquals((String)stats.lastConfirmedEntry, (String)((ManagedLedgerInternalStats.CursorStats)stats.cursors.get((Object)"__compaction")).markDeletePosition);
            });
            this.admin.topics().unload(topic);
            for (i = 0; i < 1000; ++i) {
                lastMessage = producer.newMessage().key(i + 1000 + "").value((Object)String.format("msg [%d]", i + 1000)).sendAsync();
            }
            producer.flush();
            lastMessage.join();
            Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageIdInclusive().startMessageId(MessageId.earliest).readCompacted(true).create();
            try {
                int i2;
                for (i2 = 0; i2 < 500; ++i2) {
                    Assert.assertEquals((String)((String)reader.readNext().getValue()), (String)String.format("msg [%d]", i2));
                }
                this.admin.topics().unload(topic);
                for (i2 = 0; i2 < 500; ++i2) {
                    Assert.assertEquals((String)((String)reader.readNext().getValue()), (String)String.format("msg [%d]", i2 + 500));
                }
                this.admin.topics().unload(topic);
                for (i2 = 0; i2 < 1000; ++i2) {
                    Assert.assertEquals((String)((String)reader.readNext().getValue()), (String)String.format("msg [%d]", i2 + 1000));
                }
            }
            finally {
                if (Collections.singletonList(reader).get(0) != null) {
                    reader.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadCompactedLatestMessageWithInclusive() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testLedgerRollover-" + UUID.randomUUID();
        boolean numMessages = true;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).blockIfQueueFull(true).enableBatching(false).create();
        try {
            CompletableFuture lastMessage = null;
            for (int i = 0; i < 1; ++i) {
                lastMessage = producer.newMessage().key(i + "").value((Object)String.format("msg [%d]", i)).sendAsync();
            }
            producer.flush();
            lastMessage.join();
            this.admin.topics().unload(topic);
            this.admin.topics().triggerCompaction(topic);
            Awaitility.await().untilAsserted(() -> {
                PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(topic);
                Assert.assertNotEquals((Object)stats.compactedLedger.ledgerId, (Object)-1);
                Assert.assertEquals((long)stats.compactedLedger.entries, (long)1L);
                Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats(topic).getSubscriptions().get("__compaction")).getConsumers().size(), (int)0);
                Assert.assertEquals((String)stats.lastConfirmedEntry, (String)((ManagedLedgerInternalStats.CursorStats)stats.cursors.get((Object)"__compaction")).markDeletePosition);
            });
            Awaitility.await().pollInterval(3L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
                this.admin.topics().unload(topic);
                Assert.assertTrue((boolean)this.admin.topics().getInternalStats((String)topic).lastConfirmedEntry.endsWith("-1"));
            });
            Reader reader = this.pulsarClient.newReader().topic(topic).startMessageIdInclusive().startMessageId(MessageId.latest).readCompacted(true).create();
            try {
                Assert.assertTrue((boolean)reader.hasMessageAvailable());
                Assert.assertEquals((Object)reader.readNext().getMessageId(), lastMessage.get());
            }
            finally {
                if (Collections.singletonList(reader).get(0) != null) {
                    reader.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

