/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.Map;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MessageDuplicationTest {
    private static final Logger log = LoggerFactory.getLogger(MessageDuplicationTest.class);
    private static final int BROKER_DEDUPLICATION_ENTRIES_INTERVAL = 10;
    private static final int BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS = 10;
    private static final String REPLICATOR_PREFIX = "foo";

    @Test
    public void testIsDuplicate() {
        PulsarService pulsarService = (PulsarService)Mockito.mock(PulsarService.class);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerDeduplicationEntriesInterval(10);
        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(10);
        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
        ((PulsarService)Mockito.doReturn((Object)serviceConfiguration).when((Object)pulsarService)).getConfiguration();
        PersistentTopic persistentTopic = (PersistentTopic)Mockito.mock(PersistentTopic.class);
        ManagedLedger managedLedger = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        MessageDeduplication messageDeduplication = BrokerTestUtil.spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, persistentTopic, managedLedger);
        ((MessageDeduplication)Mockito.doReturn((Object)true).when((Object)messageDeduplication)).isEnabled();
        String producerName1 = "producer1";
        ByteBuf byteBuf1 = this.getMessage(producerName1, 0L);
        Topic.PublishContext publishContext1 = this.getPublishContext(producerName1, 0L);
        String producerName2 = "producer2";
        ByteBuf byteBuf2 = this.getMessage(producerName2, 1L);
        Topic.PublishContext publishContext2 = this.getPublishContext(producerName2, 1L);
        MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.NotDup);
        Long lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)0L);
        status = messageDeduplication.isDuplicate(publishContext2, byteBuf2);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.NotDup);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName2);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        byteBuf1 = this.getMessage(producerName1, 1L);
        publishContext1 = this.getPublishContext(producerName1, 1L);
        status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.NotDup);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        byteBuf1 = this.getMessage(producerName1, 5L);
        publishContext1 = this.getPublishContext(producerName1, 5L);
        status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.NotDup);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        byteBuf1 = this.getMessage(producerName1, 0L);
        publishContext1 = this.getPublishContext(producerName1, 0L);
        status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.Unknown);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        messageDeduplication.highestSequencedPersisted.put((Object)producerName1, (Object)0L);
        byteBuf1 = this.getMessage(producerName1, 0L);
        publishContext1 = this.getPublishContext(producerName1, 0L);
        status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.Dup);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        messageDeduplication.highestSequencedPushed.put((Object)producerName1, (Object)0L);
        messageDeduplication.highestSequencedPersisted.put((Object)producerName1, (Object)0L);
        byteBuf1 = this.getMessage(producerName1, 0L);
        publishContext1 = this.getPublishContext(producerName1, 1L, 5L);
        status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.NotDup);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        publishContext1 = this.getPublishContext(producerName1, 4L, 8L);
        status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
        Assert.assertEquals((Object)status, (Object)MessageDeduplication.MessageDupStatus.Unknown);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
    }

    @Test
    public void testInactiveProducerRemove() throws Exception {
        PulsarService pulsarService = (PulsarService)Mockito.mock(PulsarService.class);
        PersistentTopic topic = (PersistentTopic)Mockito.mock(PersistentTopic.class);
        ManagedLedger managedLedger = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerDeduplicationEntriesInterval(10);
        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(10);
        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
        serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1);
        ((PulsarService)Mockito.doReturn((Object)serviceConfiguration).when((Object)pulsarService)).getConfiguration();
        MessageDeduplication messageDeduplication = BrokerTestUtil.spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, topic, managedLedger);
        ((MessageDeduplication)Mockito.doReturn((Object)true).when((Object)messageDeduplication)).isEnabled();
        ManagedCursor managedCursor = (ManagedCursor)Mockito.mock(ManagedCursor.class);
        ((MessageDeduplication)Mockito.doReturn((Object)managedCursor).when((Object)messageDeduplication)).getManagedCursor();
        Topic.PublishContext publishContext = (Topic.PublishContext)Mockito.mock(Topic.PublishContext.class);
        Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers");
        field.setAccessible(true);
        Map inactiveProducers = (Map)field.get(messageDeduplication);
        String producerName1 = "test1";
        Mockito.when((Object)publishContext.getHighestSequenceId()).thenReturn((Object)2L);
        Mockito.when((Object)publishContext.getSequenceId()).thenReturn((Object)1L);
        Mockito.when((Object)publishContext.getProducerName()).thenReturn((Object)producerName1);
        messageDeduplication.isDuplicate(publishContext, null);
        String producerName2 = "test2";
        Mockito.when((Object)publishContext.getProducerName()).thenReturn((Object)producerName2);
        messageDeduplication.isDuplicate(publishContext, null);
        String producerName3 = "test3";
        Mockito.when((Object)publishContext.getProducerName()).thenReturn((Object)producerName3);
        messageDeduplication.isDuplicate(publishContext, null);
        messageDeduplication.producerRemoved(producerName1);
        messageDeduplication.producerRemoved(producerName2);
        messageDeduplication.producerRemoved(producerName3);
        messageDeduplication.purgeInactiveProducers();
        Assert.assertEquals((int)inactiveProducers.size(), (int)3);
        ((MessageDeduplication)Mockito.doReturn((Object)false).when((Object)messageDeduplication)).isEnabled();
        inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000L);
        inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000L);
        messageDeduplication.purgeInactiveProducers();
        Assert.assertFalse((boolean)inactiveProducers.containsKey(producerName2));
        Assert.assertFalse((boolean)inactiveProducers.containsKey(producerName3));
        ((MessageDeduplication)Mockito.doReturn((Object)true).when((Object)messageDeduplication)).isEnabled();
        inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000L);
        inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000L);
        messageDeduplication.purgeInactiveProducers();
        Assert.assertFalse((boolean)inactiveProducers.containsKey(producerName2));
        Assert.assertFalse((boolean)inactiveProducers.containsKey(producerName3));
        field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
        field.setAccessible(true);
        ConcurrentOpenHashMap highestSequencedPushed = (ConcurrentOpenHashMap)field.get(messageDeduplication);
        Assert.assertEquals((long)((Long)highestSequencedPushed.get((Object)producerName1)), (long)2L);
        Assert.assertFalse((boolean)highestSequencedPushed.containsKey((Object)producerName2));
        Assert.assertFalse((boolean)highestSequencedPushed.containsKey((Object)producerName3));
    }

    @Test
    public void testIsDuplicateWithFailure() {
        PulsarService pulsarService = (PulsarService)Mockito.mock(PulsarService.class);
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerDeduplicationEntriesInterval(10);
        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(10);
        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
        ((PulsarService)Mockito.doReturn((Object)serviceConfiguration).when((Object)pulsarService)).getConfiguration();
        ManagedLedger managedLedger = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        MessageDeduplication messageDeduplication = (MessageDeduplication)Mockito.spy((Object)new MessageDeduplication(pulsarService, (PersistentTopic)Mockito.mock(PersistentTopic.class), managedLedger));
        ((MessageDeduplication)Mockito.doReturn((Object)true).when((Object)messageDeduplication)).isEnabled();
        EventLoopGroup eventLoopGroup = (EventLoopGroup)Mockito.mock(EventLoopGroup.class);
        ((EventLoopGroup)Mockito.doAnswer(invocationOnMock -> {
            Object[] args = invocationOnMock.getArguments();
            Runnable test = (Runnable)args[0];
            test.run();
            return null;
        }).when((Object)eventLoopGroup)).submit((Runnable)ArgumentMatchers.any(Runnable.class));
        BrokerService brokerService = (BrokerService)Mockito.mock(BrokerService.class);
        ((BrokerService)Mockito.doReturn((Object)eventLoopGroup).when((Object)brokerService)).executor();
        ((BrokerService)Mockito.doReturn((Object)pulsarService).when((Object)brokerService)).pulsar();
        PersistentTopic persistentTopic = BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentTopic.class, "topic-1", brokerService, managedLedger, messageDeduplication);
        String producerName1 = "producer1";
        ByteBuf byteBuf1 = this.getMessage(producerName1, 0L);
        Topic.PublishContext publishContext1 = this.getPublishContext(producerName1, 0L);
        String producerName2 = "producer2";
        ByteBuf byteBuf2 = this.getMessage(producerName2, 1L);
        Topic.PublishContext publishContext2 = this.getPublishContext(producerName2, 1L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        persistentTopic.addComplete((Position)new PositionImpl(0L, 1L), null, (Object)publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)1))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        Long lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)0L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)0L);
        persistentTopic.publishMessage(byteBuf2, publishContext2);
        persistentTopic.addComplete((Position)new PositionImpl(0L, 2L), null, (Object)publishContext2);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)2))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName2);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName2);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        byteBuf1 = this.getMessage(producerName1, 1L);
        publishContext1 = this.getPublishContext(producerName1, 1L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        persistentTopic.addComplete((Position)new PositionImpl(0L, 3L), null, (Object)publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)3))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        byteBuf1 = this.getMessage(producerName1, 5L);
        publishContext1 = this.getPublishContext(producerName1, 5L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        persistentTopic.addComplete((Position)new PositionImpl(0L, 4L), null, (Object)publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)4))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        byteBuf1 = this.getMessage(producerName1, 0L);
        publishContext1 = this.getPublishContext(producerName1, 0L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)4))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        ((Topic.PublishContext)Mockito.verify((Object)publishContext1, (VerificationMode)Mockito.times((int)1))).completed((Exception)ArgumentMatchers.eq(null), ArgumentMatchers.eq((long)-1L), ArgumentMatchers.eq((long)-1L));
        byteBuf1 = this.getMessage(producerName1, 6L);
        publishContext1 = this.getPublishContext(producerName1, 6L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)5))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)6L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)5L);
        byteBuf1 = this.getMessage(producerName1, 6L);
        publishContext1 = this.getPublishContext(producerName1, 6L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)5))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        ((Topic.PublishContext)Mockito.verify((Object)publishContext1, (VerificationMode)Mockito.times((int)1))).completed((Exception)ArgumentMatchers.any(MessageDeduplication.MessageDupUnknownException.class), ArgumentMatchers.eq((long)-1L), ArgumentMatchers.eq((long)-1L));
        persistentTopic.addComplete((Position)new PositionImpl(0L, 5L), null, (Object)publishContext1);
        byteBuf1 = this.getMessage(producerName1, 7L);
        publishContext1 = this.getPublishContext(producerName1, 7L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)6))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        persistentTopic.addFailed(new ManagedLedgerException("test"), (Object)publishContext1);
        Assert.assertEquals((long)messageDeduplication.highestSequencedPushed.size(), (long)2L);
        Assert.assertEquals((long)messageDeduplication.highestSequencedPersisted.size(), (long)2L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)6L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName1);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)6L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName2);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName2);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)1L);
        ((MessageDeduplication)Mockito.verify((Object)messageDeduplication, (VerificationMode)Mockito.times((int)1))).resetHighestSequenceIdPushed();
        byteBuf1 = this.getMessage(producerName1, 6L);
        publishContext1 = this.getPublishContext(producerName1, 6L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)6))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        ((Topic.PublishContext)Mockito.verify((Object)publishContext1, (VerificationMode)Mockito.times((int)1))).completed((Exception)ArgumentMatchers.eq(null), ArgumentMatchers.eq((long)-1L), ArgumentMatchers.eq((long)-1L));
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)6L);
        byteBuf1 = this.getMessage(producerName1, 8L);
        publishContext1 = this.getPublishContext(producerName1, 8L);
        persistentTopic.publishMessage(byteBuf1, publishContext1);
        ((ManagedLedger)Mockito.verify((Object)managedLedger, (VerificationMode)Mockito.times((int)7))).asyncAddEntry((ByteBuf)ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback)ArgumentMatchers.any(), ArgumentMatchers.any());
        persistentTopic.addComplete((Position)new PositionImpl(0L, 5L), null, (Object)publishContext1);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPushed.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)8L);
        lastSequenceIdPushed = (Long)messageDeduplication.highestSequencedPersisted.get((Object)producerName1);
        Assert.assertNotNull((Object)lastSequenceIdPushed);
        Assert.assertEquals((long)lastSequenceIdPushed, (long)8L);
    }

    public ByteBuf getMessage(String producerName, long seqId) {
        MessageMetadata messageMetadata = new MessageMetadata().setProducerName(producerName).setSequenceId(seqId).setPublishTime(System.currentTimeMillis());
        return Commands.serializeMetadataAndPayload((Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)messageMetadata, (ByteBuf)Unpooled.copiedBuffer((byte[])new byte[0]));
    }

    public Topic.PublishContext getPublishContext(final String producerName, final long seqId) {
        return (Topic.PublishContext)Mockito.spy((Object)new Topic.PublishContext(){

            public String getProducerName() {
                return producerName;
            }

            public long getSequenceId() {
                return seqId;
            }

            public void completed(Exception e, long ledgerId, long entryId) {
            }
        });
    }

    public Topic.PublishContext getPublishContext(final String producerName, final long seqId, final long lastSequenceId) {
        return (Topic.PublishContext)Mockito.spy((Object)new Topic.PublishContext(){

            public String getProducerName() {
                return producerName;
            }

            public long getSequenceId() {
                return seqId;
            }

            public long getHighestSequenceId() {
                return lastSequenceId;
            }

            public void completed(Exception e, long ledgerId, long entryId) {
            }
        });
    }
}

