/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.PulsarCommandSenderImpl;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MessageCumulativeAckTest {
    private final int consumerId = 1;
    private BrokerService brokerService;
    private ServerCnx serverCnx;
    private MetadataStore store;
    protected PulsarService pulsar;
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;
    private PersistentSubscription sub;

    @BeforeMethod
    public void setup() throws Exception {
        this.executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
        ServiceConfiguration svcConfig = (ServiceConfiguration)Mockito.spy(ServiceConfiguration.class);
        svcConfig.setBrokerShutdownTimeoutMs(0L);
        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Double.valueOf(1.0));
        svcConfig.setClusterName("pulsar-cluster");
        this.pulsar = BrokerTestUtil.spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
        ((PulsarService)Mockito.doReturn((Object)svcConfig).when((Object)this.pulsar)).getConfiguration();
        ManagedLedgerFactory mlFactoryMock = (ManagedLedgerFactory)Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService)Mockito.doReturn((Object)mlFactoryMock).when((Object)this.pulsar)).getManagedLedgerFactory();
        ((PulsarService)Mockito.doReturn((Object)((Object)TransactionTestBase.createMockBookKeeper(this.executor))).when((Object)this.pulsar)).getBookKeeperClient();
        this.store = MetadataStoreFactory.create((String)"memory://local", (MetadataStoreConfig)MetadataStoreConfig.builder().build());
        ((PulsarService)Mockito.doReturn((Object)this.store).when((Object)this.pulsar)).getLocalMetadataStore();
        ((PulsarService)Mockito.doReturn((Object)this.store).when((Object)this.pulsar)).getConfigurationMetadataStore();
        PulsarResources pulsarResources = new PulsarResources(this.store, this.store);
        ((PulsarService)Mockito.doReturn((Object)pulsarResources).when((Object)this.pulsar)).getPulsarResources();
        this.serverCnx = BrokerTestUtil.spyWithClassAndConstructorArgs(ServerCnx.class, this.pulsar);
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isActive();
        ((ServerCnx)Mockito.doReturn((Object)true).when((Object)this.serverCnx)).isWritable();
        ((ServerCnx)Mockito.doReturn((Object)new InetSocketAddress("localhost", 1234)).when((Object)this.serverCnx)).clientAddress();
        Mockito.when((Object)this.serverCnx.getRemoteEndpointProtocolVersion()).thenReturn((Object)ProtocolVersion.v12.getValue());
        Mockito.when((Object)this.serverCnx.ctx()).thenReturn(Mockito.mock(ChannelHandlerContext.class));
        ((ServerCnx)Mockito.doReturn((Object)new PulsarCommandSenderImpl(null, this.serverCnx)).when((Object)this.serverCnx)).getCommandSender();
        this.eventLoopGroup = new NioEventLoopGroup();
        this.brokerService = BrokerTestUtil.spyWithClassAndConstructorArgs(BrokerService.class, this.pulsar, this.eventLoopGroup);
        ((PulsarService)Mockito.doReturn((Object)this.brokerService).when((Object)this.pulsar)).getBrokerService();
        String topicName = TopicName.get((String)"MessageCumulativeAckTest").toString();
        PersistentTopic persistentTopic = new PersistentTopic(topicName, (ManagedLedger)Mockito.mock(ManagedLedger.class), this.brokerService);
        this.sub = (PersistentSubscription)Mockito.spy((Object)new PersistentSubscription(persistentTopic, "sub-1", (ManagedCursor)Mockito.mock(ManagedCursorImpl.class), false));
        ((PersistentSubscription)Mockito.doNothing().when((Object)this.sub)).acknowledgeMessage((List)ArgumentMatchers.any(), (CommandAck.AckType)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
    }

    @AfterMethod(alwaysRun=true)
    public void shutdown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.close();
            this.brokerService = null;
        }
        if (this.pulsar != null) {
            this.pulsar.close();
            this.pulsar = null;
        }
        this.executor.shutdown();
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownGracefully().get();
        }
        this.store.close();
        this.sub = null;
    }

    @DataProvider(name="individualAckModes")
    public static Object[][] individualAckModes() {
        return new Object[][]{{CommandSubscribe.SubType.Shared}, {CommandSubscribe.SubType.Key_Shared}};
    }

    @DataProvider(name="notIndividualAckModes")
    public static Object[][] notIndividualAckModes() {
        return new Object[][]{{CommandSubscribe.SubType.Exclusive}, {CommandSubscribe.SubType.Failover}};
    }

    @Test(timeOut=5000L, dataProvider="individualAckModes")
    public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
        Consumer consumer = new Consumer((Subscription)this.sub, subType, "topic-1", 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
        CommandAck commandAck = new CommandAck();
        commandAck.setAckType(CommandAck.AckType.Cumulative);
        commandAck.setConsumerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
        consumer.messageAcked(commandAck).get();
        ((PersistentSubscription)Mockito.verify((Object)this.sub, (VerificationMode)Mockito.never())).acknowledgeMessage((List)ArgumentMatchers.any(), (CommandAck.AckType)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
    }

    @Test(timeOut=5000L, dataProvider="notIndividualAckModes")
    public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
        Consumer consumer = new Consumer((Subscription)this.sub, subType, "topic-1", 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
        CommandAck commandAck = new CommandAck();
        commandAck.setAckType(CommandAck.AckType.Cumulative);
        commandAck.setConsumerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
        consumer.messageAcked(commandAck).get();
        ((PersistentSubscription)Mockito.verify((Object)this.sub, (VerificationMode)Mockito.times((int)1))).acknowledgeMessage((List)ArgumentMatchers.any(), (CommandAck.AckType)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
    }

    @Test(timeOut=5000L)
    public void testAckWithMoreThanNoneMessageIds() throws Exception {
        Consumer consumer = new Consumer((Subscription)this.sub, CommandSubscribe.SubType.Failover, "topic-1", 1L, 0, "Cons1", 50000, (TransportCnx)this.serverCnx, "myrole-1", Collections.emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
        CommandAck commandAck = new CommandAck();
        commandAck.setAckType(CommandAck.AckType.Cumulative);
        commandAck.setConsumerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
        commandAck.addMessageId().setEntryId(0L).setLedgerId(2L);
        consumer.messageAcked(commandAck).get();
        ((PersistentSubscription)Mockito.verify((Object)this.sub, (VerificationMode)Mockito.never())).acknowledgeMessage((List)ArgumentMatchers.any(), (CommandAck.AckType)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
    }
}

