/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashMap;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BundleSplitterTaskTest {
    private static final Logger log = LoggerFactory.getLogger(BundleSplitterTaskTest.class);
    private LocalBookkeeperEnsemble bkEnsemble;
    private PulsarService pulsar;

    @BeforeMethod
    void setup() throws Exception {
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        ServiceConfiguration config = new ServiceConfiguration();
        config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        config.setClusterName("use");
        config.setWebServicePort(Optional.of(0));
        config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        config.setAdvertisedAddress("localhost");
        config.setBrokerShutdownTimeoutMs(0L);
        config.setBrokerServicePort(Optional.of(0));
        config.setBrokerServicePortTls(Optional.of(0));
        config.setWebServicePortTls(Optional.of(0));
        this.pulsar = new PulsarService(config);
        this.pulsar.start();
    }

    @Test
    public void testSplitTaskWhenTopicJustOne() {
        BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
        LoadData loadData = new LoadData();
        LocalBrokerData brokerData = new LocalBrokerData();
        HashMap<String, NamespaceBundleStats> lastStats = new HashMap<String, NamespaceBundleStats>();
        NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
        namespaceBundleStats.topics = 1L;
        lastStats.put("ten/ns/0x00000000_0x80000000", namespaceBundleStats);
        brokerData.setLastStats(lastStats);
        loadData.getBrokerData().put("broker", new BrokerData(brokerData));
        BundleData bundleData = new BundleData();
        TimeAverageMessageData averageMessageData = new TimeAverageMessageData();
        averageMessageData.setMsgRateIn((double)this.pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate());
        averageMessageData.setMsgRateOut(1.0);
        bundleData.setLongTermData(averageMessageData);
        loadData.getBundleData().put("ten/ns/0x00000000_0x80000000", bundleData);
        Set bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, this.pulsar);
        Assert.assertEquals((int)bundlesToSplit.size(), (int)0);
    }

    @Test
    public void testLoadBalancerNamespaceMaximumBundles() throws Exception {
        this.pulsar.getConfiguration().setLoadBalancerNamespaceMaximumBundles(3);
        BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
        LoadData loadData = new LoadData();
        LocalBrokerData brokerData = new LocalBrokerData();
        HashMap<String, NamespaceBundleStats> lastStats = new HashMap<String, NamespaceBundleStats>();
        NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
        namespaceBundleStats.topics = 5L;
        lastStats.put("ten/ns/0x00000000_0x20000000", namespaceBundleStats);
        NamespaceBundleStats namespaceBundleStats2 = new NamespaceBundleStats();
        namespaceBundleStats2.topics = 5L;
        lastStats.put("ten/ns/0x20000000_0x40000000", namespaceBundleStats2);
        NamespaceBundleStats namespaceBundleStats3 = new NamespaceBundleStats();
        namespaceBundleStats3.topics = 5L;
        lastStats.put("ten/ns/0x40000000_0x60000000", namespaceBundleStats3);
        brokerData.setLastStats(lastStats);
        loadData.getBrokerData().put("broker", new BrokerData(brokerData));
        BundleData bundleData1 = new BundleData();
        TimeAverageMessageData averageMessageData1 = new TimeAverageMessageData();
        averageMessageData1.setMsgRateIn((double)(this.pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2));
        averageMessageData1.setMsgRateOut(1.0);
        bundleData1.setLongTermData(averageMessageData1);
        loadData.getBundleData().put("ten/ns/0x00000000_0x20000000", bundleData1);
        BundleData bundleData2 = new BundleData();
        TimeAverageMessageData averageMessageData2 = new TimeAverageMessageData();
        averageMessageData2.setMsgRateIn((double)(this.pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2));
        averageMessageData2.setMsgRateOut(1.0);
        bundleData2.setLongTermData(averageMessageData2);
        loadData.getBundleData().put("ten/ns/0x20000000_0x40000000", bundleData2);
        BundleData bundleData3 = new BundleData();
        TimeAverageMessageData averageMessageData3 = new TimeAverageMessageData();
        averageMessageData3.setMsgRateIn((double)(this.pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2));
        averageMessageData3.setMsgRateOut(1.0);
        bundleData3.setLongTermData(averageMessageData3);
        loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", bundleData3);
        int currentBundleCount = this.pulsar.getNamespaceService().getBundleCount(NamespaceName.get((String)"ten/ns"));
        Set bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, this.pulsar);
        Assert.assertEquals((int)(bundlesToSplit.size() + currentBundleCount), (int)this.pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.pulsar.close();
        this.bkEnsemble.stop();
    }
}

