/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.gson.Gson;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BkEnsemblesTestBase;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"quarantine"})
public class RackAwareTest
extends BkEnsemblesTestBase {
    private static final int NUM_BOOKIES = 6;
    private final List<BookieServer> bookies = new ArrayList<BookieServer>();
    private static final Logger log = LoggerFactory.getLogger(RackAwareTest.class);

    public RackAwareTest() {
        super(0);
    }

    @Override
    protected void configurePulsar(ServiceConfiguration config) throws Exception {
        for (int i = 0; i < 6; ++i) {
            File bkDataDir = Files.createTempDirectory("bk" + Integer.toString(i) + "test", new FileAttribute[0]).toFile();
            ServerConfiguration conf = new ServerConfiguration();
            conf.setBookiePort(0);
            conf.setZkServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            conf.setJournalDirName(bkDataDir.getPath());
            conf.setLedgerDirNames(new String[]{bkDataDir.getPath()});
            conf.setAllowLoopback(true);
            String addr = String.format("10.0.0.%d", i + 1);
            conf.setAdvertisedAddress(addr);
            BookieServer bs = new BookieServer(conf, (StatsLogger)NullStatsLogger.INSTANCE, null);
            bs.start();
            this.bookies.add(bs);
        }
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.cleanup();
        for (BookieServer bs : this.bookies) {
            bs.shutdown();
        }
        this.bookies.clear();
    }

    @Test
    public void testPlacement() throws Exception {
        String group = "default";
        for (int i = 0; i < 6; ++i) {
            String bookie = this.bookies.get(i).getLocalAddress().toString();
            int rackId = i == 0 ? 1 : 2;
            BookieInfo bi = BookieInfo.builder().rack("rack-" + rackId).hostname("bookie-" + (i + 1)).build();
            log.info("setting rack for bookie at {} -- {}", (Object)bookie, (Object)bi);
            this.admin.bookies().updateBookieRackInfo(bookie, "default", bi);
        }
        Awaitility.await().untilAsserted(() -> {
            byte[] data = this.bkEnsemble.getZkClient().getData("/bookies", false, null);
            TreeMap rackInfoMap = (TreeMap)new Gson().fromJson(new String(data), TreeMap.class);
            Assert.assertTrue((((Map)rackInfoMap.get("default")).size() == 6 ? 1 : 0) != 0);
            Set racks = rackInfoMap.values().stream().map(Map::values).flatMap(bookieId -> bookieId.stream().map(rackInfo -> (String)rackInfo.get("rack"))).collect(Collectors.toSet());
            Assert.assertTrue((boolean)racks.containsAll(Lists.newArrayList((Object[])new String[]{"rack-1", "rack-2"})));
        });
        BookKeeper bkc = this.pulsar.getBookKeeperClient();
        BookieId firstBookie = this.bookies.get(0).getBookieId();
        for (int i = 0; i < 100; ++i) {
            LedgerHandle lh = bkc.createLedger(2, 2, BookKeeper.DigestType.DUMMY, new byte[0]);
            log.info("Ledger: {} -- Ensemble: {}", (Object)i, (Object)lh.getLedgerMetadata().getEnsembleAt(0L));
            Assert.assertTrue((boolean)lh.getLedgerMetadata().getEnsembleAt(0L).contains(firstBookie), (String)"first bookie in rack 0 not included in ensemble");
            lh.close();
        }
    }
}

