/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.nonpersistent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockObjectFactory;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

@PrepareForTest(value={DispatchRateLimiter.class})
@PowerMockIgnore(value={"org.apache.logging.log4j.*"})
public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
    private PulsarService pulsarMock;
    private BrokerService brokerMock;
    private NonPersistentTopic topicMock;
    private NonPersistentSubscription subscriptionMock;
    private ServiceConfiguration configMock;
    private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher;
    final String topicName = "non-persistent://public/default/testTopic";

    @ObjectFactory
    public IObjectFactory getObjectFactory() {
        return new PowerMockObjectFactory();
    }

    @BeforeMethod
    public void setup() throws Exception {
        this.configMock = (ServiceConfiguration)Mockito.mock(ServiceConfiguration.class);
        ((ServiceConfiguration)Mockito.doReturn((Object)true).when((Object)this.configMock)).isSubscriptionRedeliveryTrackerEnabled();
        ((ServiceConfiguration)Mockito.doReturn((Object)100).when((Object)this.configMock)).getDispatcherMaxReadBatchSize();
        ((ServiceConfiguration)Mockito.doReturn((Object)true).when((Object)this.configMock)).isSubscriptionKeySharedUseConsistentHashing();
        ((ServiceConfiguration)Mockito.doReturn((Object)1).when((Object)this.configMock)).getSubscriptionKeySharedConsistentHashingReplicaPoints();
        this.pulsarMock = (PulsarService)Mockito.mock(PulsarService.class);
        ((PulsarService)Mockito.doReturn((Object)this.configMock).when((Object)this.pulsarMock)).getConfiguration();
        this.brokerMock = (BrokerService)Mockito.mock(BrokerService.class);
        ((BrokerService)Mockito.doReturn((Object)this.pulsarMock).when((Object)this.brokerMock)).pulsar();
        this.topicMock = (NonPersistentTopic)Mockito.mock(NonPersistentTopic.class);
        ((NonPersistentTopic)Mockito.doReturn((Object)this.brokerMock).when((Object)this.topicMock)).getBrokerService();
        ((NonPersistentTopic)Mockito.doReturn((Object)"non-persistent://public/default/testTopic").when((Object)this.topicMock)).getName();
        this.subscriptionMock = (NonPersistentSubscription)Mockito.mock(NonPersistentSubscription.class);
        PowerMockito.mockStatic(DispatchRateLimiter.class, (Class[])new Class[0]);
        PowerMockito.when((Object)DispatchRateLimiter.isDispatchRateNeeded((BrokerService)((BrokerService)ArgumentMatchers.any(BrokerService.class)), (Optional)((Optional)ArgumentMatchers.any(Optional.class)), (String)ArgumentMatchers.anyString(), (DispatchRateLimiter.Type)((DispatchRateLimiter.Type)ArgumentMatchers.any(DispatchRateLimiter.Type.class)))).thenReturn((Object)false);
        this.nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(this.topicMock, (Subscription)this.subscriptionMock, (StickyKeyConsumerSelector)new HashRangeAutoSplitStickyKeyConsumerSelector());
    }

    @Test(timeOut=10000L)
    public void testSendMessage() throws BrokerServiceException {
        Consumer consumerMock = (Consumer)Mockito.mock(Consumer.class);
        Mockito.when((Object)consumerMock.getAvailablePermits()).thenReturn((Object)1000);
        Mockito.when((Object)consumerMock.isWritable()).thenReturn((Object)true);
        this.nonpersistentDispatcher.addConsumer(consumerMock);
        ArrayList<EntryImpl> entries = new ArrayList<EntryImpl>();
        entries.add(EntryImpl.create((long)1L, (long)1L, (ByteBuf)this.createMessage("message1", 1)));
        entries.add(EntryImpl.create((long)1L, (long)2L, (ByteBuf)this.createMessage("message2", 2)));
        ((Consumer)Mockito.doAnswer(invocationOnMock -> {
            ChannelPromise mockPromise = (ChannelPromise)Mockito.mock(ChannelPromise.class);
            List receivedEntries = (List)invocationOnMock.getArgument(0, List.class);
            for (int index = 1; index <= receivedEntries.size(); ++index) {
                Entry entry = (Entry)receivedEntries.get(index - 1);
                Assert.assertEquals((long)entry.getLedgerId(), (long)1L);
                Assert.assertEquals((long)entry.getEntryId(), (long)index);
                ByteBuf byteBuf = entry.getDataBuffer();
                MessageMetadata messageMetadata = Commands.parseMessageMetadata((ByteBuf)byteBuf);
                Assert.assertEquals((String)byteBuf.toString(StandardCharsets.UTF_8), (String)("message" + index));
            }
            return mockPromise;
        }).when((Object)consumerMock)).sendMessages((List)ArgumentMatchers.any(List.class), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
        try {
            this.nonpersistentDispatcher.sendMessages(entries);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to sendMessages.", (Throwable)e);
        }
        ((Consumer)Mockito.verify((Object)consumerMock, (VerificationMode)Mockito.times((int)1))).sendMessages((List)ArgumentMatchers.any(List.class), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.eq(null), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
    }

    @Test(timeOut=10000L)
    public void testSendMessageRespectFlowControl() throws BrokerServiceException {
        Consumer consumerMock = (Consumer)Mockito.mock(Consumer.class);
        this.nonpersistentDispatcher.addConsumer(consumerMock);
        ArrayList<EntryImpl> entries = new ArrayList<EntryImpl>();
        entries.add(EntryImpl.create((long)1L, (long)1L, (ByteBuf)this.createMessage("message1", 1)));
        entries.add(EntryImpl.create((long)1L, (long)2L, (ByteBuf)this.createMessage("message2", 2)));
        ((Consumer)Mockito.doAnswer(invocationOnMock -> {
            ChannelPromise mockPromise = (ChannelPromise)Mockito.mock(ChannelPromise.class);
            List receivedEntries = (List)invocationOnMock.getArgument(0, List.class);
            for (int index = 1; index <= receivedEntries.size(); ++index) {
                Entry entry = (Entry)receivedEntries.get(index - 1);
                Assert.assertEquals((long)entry.getLedgerId(), (long)1L);
                Assert.assertEquals((long)entry.getEntryId(), (long)index);
                ByteBuf byteBuf = entry.getDataBuffer();
                MessageMetadata messageMetadata = Commands.parseMessageMetadata((ByteBuf)byteBuf);
                Assert.assertEquals((String)byteBuf.toString(StandardCharsets.UTF_8), (String)("message" + index));
            }
            return mockPromise;
        }).when((Object)consumerMock)).sendMessages((List)ArgumentMatchers.any(List.class), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
        try {
            this.nonpersistentDispatcher.sendMessages(entries);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to sendMessages.", (Throwable)e);
        }
        ((Consumer)Mockito.verify((Object)consumerMock, (VerificationMode)Mockito.times((int)0))).sendMessages((List)ArgumentMatchers.any(List.class), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.eq(null), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
    }

    private ByteBuf createMessage(String message, int sequenceId) {
        return this.createMessage(message, sequenceId, "testKey");
    }

    private ByteBuf createMessage(String message, int sequenceId, String key) {
        MessageMetadata messageMetadata = new MessageMetadata().setSequenceId((long)sequenceId).setProducerName("testProducer").setPartitionKey(key).setPartitionKeyB64Encoded(false).setPublishTime(System.currentTimeMillis());
        return Commands.serializeMetadataAndPayload((Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)messageMetadata, (ByteBuf)Unpooled.copiedBuffer((byte[])message.getBytes(StandardCharsets.UTF_8)));
    }
}

