/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.metadata.TestZKServer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MultiBrokerLeaderElectionTest
extends MultiBrokerBaseTest {
    private static final Logger log = LoggerFactory.getLogger(MultiBrokerLeaderElectionTest.class);
    TestZKServer testZKServer;

    @Override
    protected int numberOfAdditionalBrokers() {
        return 9;
    }

    @Override
    protected void doInitConf() throws Exception {
        super.doInitConf();
        this.testZKServer = new TestZKServer();
    }

    @Override
    protected void onCleanup() {
        super.onCleanup();
        if (this.testZKServer != null) {
            try {
                this.testZKServer.close();
            }
            catch (Exception e) {
                log.error("Error in stopping ZK server", (Throwable)e);
            }
        }
    }

    @Override
    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
        return MetadataStoreExtended.create((String)this.testZKServer.getConnectionString(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
    }

    @Override
    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
        return MetadataStoreExtended.create((String)this.testZKServer.getConnectionString(), (MetadataStoreConfig)MetadataStoreConfig.builder().build());
    }

    @Test
    public void shouldElectOneLeader() {
        int leaders = 0;
        for (PulsarService broker : this.getAllBrokers()) {
            if (!broker.getLeaderElectionService().isLeader()) continue;
            ++leaders;
        }
        Assert.assertEquals((int)leaders, (int)1);
    }

    @Test
    public void shouldAllBrokersKnowTheLeader() {
        Awaitility.await().untilAsserted(() -> {
            for (PulsarService broker : this.getAllBrokers()) {
                Optional currentLeader = broker.getLeaderElectionService().getCurrentLeader();
                Assert.assertTrue((boolean)currentLeader.isPresent(), (String)("Leader wasn't known on broker " + broker.getBrokerServiceUrl()));
            }
        });
    }

    @Test
    public void shouldAllBrokersBeAbleToGetTheLeader() {
        Awaitility.await().untilAsserted(() -> {
            LeaderBroker leader = null;
            for (PulsarService broker : this.getAllBrokers()) {
                Optional currentLeader = (Optional)broker.getLeaderElectionService().readCurrentLeader().get(1L, TimeUnit.SECONDS);
                Assert.assertTrue((boolean)currentLeader.isPresent(), (String)("Leader wasn't known on broker " + broker.getBrokerServiceUrl()));
                if (leader != null) {
                    Assert.assertEquals(currentLeader.get(), (Object)leader, (String)("Different leader on broker " + broker.getBrokerServiceUrl()));
                    continue;
                }
                leader = (LeaderBroker)currentLeader.get();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldProvideConsistentAnswerToTopicLookups() throws PulsarAdminException, ExecutionException, InterruptedException {
        String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
        List topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i).collect(Collectors.toList());
        List<PulsarAdmin> allAdmins = this.getAllAdmins();
        ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
        try {
            ArrayList<Future<List>> resultFutures = new ArrayList<Future<List>>();
            String leaderBrokerUrl = this.admin.brokers().getLeaderBroker().getServiceUrl();
            log.info("LEADER is {}", (Object)leaderBrokerUrl);
            Phaser phaser = new Phaser(1);
            for (PulsarAdmin brokerAdmin : allAdmins) {
                if (leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) continue;
                phaser.register();
                log.info("Doing lookup to broker {}", (Object)brokerAdmin.getServiceUrl());
                resultFutures.add(executorService.submit(() -> {
                    phaser.arriveAndAwaitAdvance();
                    return topicNames.stream().map(topicName -> {
                        try {
                            return brokerAdmin.lookups().lookupTopic(topicName);
                        }
                        catch (PulsarAdminException e) {
                            log.error("Error looking up topic {} in {}", topicName, (Object)brokerAdmin.getServiceUrl());
                            throw new RuntimeException(e);
                        }
                    }).collect(Collectors.toList());
                }));
            }
            phaser.arriveAndAwaitAdvance();
            List firstResult = null;
            for (Future future : resultFutures) {
                List result = (List)future.get();
                if (firstResult == null) {
                    firstResult = result;
                    continue;
                }
                Assert.assertEquals((Collection)result, (Collection)firstResult, (String)"The lookup results weren't consistent.");
            }
        }
        finally {
            if (Collections.singletonList(executorService).get(0) != null) {
                executorService.shutdown();
            }
        }
    }
}

