package co.cask.cdap.watchdog.election;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.zookeeper.ZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/watchdog/election/MultiLeaderElection.class */
public class MultiLeaderElection extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(MultiLeaderElection.class);
    private static final Random RANDOM = new Random(System.nanoTime());
    private final ZKClient zkClient;
    private final String name;
    private final int partitionSize;
    private final PartitionChangeHandler handler;
    private int leaderElectionSleepMs = 8000;
    private final Runnable runHandler = new Runnable() { // from class: co.cask.cdap.watchdog.election.MultiLeaderElection.3
        @Override // java.lang.Runnable
        public void run() {
            Set<Integer> copyOf = ImmutableSet.copyOf(MultiLeaderElection.this.leaderPartitions);
            if (copyOf.equals(MultiLeaderElection.this.prevLeaderPartitions)) {
                return;
            }
            MultiLeaderElection.LOG.info("Leader partitions changed - {}", copyOf);
            MultiLeaderElection.this.prevLeaderPartitions = copyOf;
            MultiLeaderElection.this.handler.partitionsChanged(copyOf);
        }
    };
    private final ExecutorService executor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("multi-leader-election"));
    private final Set<Integer> leaderPartitions = Sets.newCopyOnWriteArraySet();
    private final List<LeaderElection> electionCancels = Lists.newArrayList();
    private Set<Integer> prevLeaderPartitions = ImmutableSet.of();
    private final CountDownLatch stopLatch = new CountDownLatch(1);

    public MultiLeaderElection(ZKClient zKClient, String str, int i, PartitionChangeHandler partitionChangeHandler) {
        this.zkClient = zKClient;
        this.name = str;
        this.partitionSize = i;
        this.handler = partitionChangeHandler;
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.watchdog.election.MultiLeaderElection.1
            @Override // java.util.concurrent.Executor
            public void execute(Runnable runnable) {
                Thread thread = new Thread(runnable, MultiLeaderElection.this.getServiceName());
                thread.setDaemon(true);
                thread.start();
            }
        };
    }

    protected void run() throws Exception {
        LOG.info("Starting multi leader election...");
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        for (int i = 0; i < this.partitionSize; i++) {
            if (RANDOM.nextBoolean()) {
                newHashSet.add(Integer.valueOf(i));
            } else {
                newHashSet2.add(Integer.valueOf(i));
            }
        }
        int nextInt = RANDOM.nextInt(this.leaderElectionSleepMs) + 1;
        LOG.debug("Sleeping for {} ms for partition {} before leader election", Integer.valueOf(nextInt), newHashSet);
        TimeUnit.MILLISECONDS.sleep(nextInt);
        runElection(newHashSet);
        int nextInt2 = RANDOM.nextInt(this.leaderElectionSleepMs) + 1;
        LOG.debug("Sleeping for {} ms for partition {} before leader election", Integer.valueOf(nextInt2), newHashSet2);
        TimeUnit.MILLISECONDS.sleep(nextInt2);
        runElection(newHashSet2);
        LOG.info("Multi leader election started.");
        this.stopLatch.await();
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping leader election.");
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<LeaderElection> it = this.electionCancels.iterator();
        while (it.hasNext()) {
            newArrayList.add(it.next().stop());
        }
        try {
            Futures.successfulAsList(newArrayList).get(10L, TimeUnit.SECONDS);
            this.executor.shutdown();
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
            LOG.info("Leader election stopped.");
        } catch (Throwable th) {
            this.executor.shutdown();
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
            LOG.info("Leader election stopped.");
            throw th;
        }
    }

    protected void triggerShutdown() {
        this.stopLatch.countDown();
    }

    public void setLeaderElectionSleepMs(int i) {
        this.leaderElectionSleepMs = i;
    }

    private void runElection(Set<Integer> set) throws Exception {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            final int intValue = it.next().intValue();
            LOG.info("Start leader election for partition {}", Integer.valueOf(intValue));
            LeaderElection leaderElection = new LeaderElection(this.zkClient, String.format("/election/%s/part-%d", this.name, Integer.valueOf(intValue)), new ElectionHandler() { // from class: co.cask.cdap.watchdog.election.MultiLeaderElection.2
                public void leader() {
                    MultiLeaderElection.this.leaderPartitions.add(Integer.valueOf(intValue));
                    MultiLeaderElection.this.executor.submit(MultiLeaderElection.this.runHandler);
                }

                public void follower() {
                    MultiLeaderElection.this.leaderPartitions.remove(Integer.valueOf(intValue));
                    MultiLeaderElection.this.executor.submit(MultiLeaderElection.this.runHandler);
                }
            });
            leaderElection.start();
            this.electionCancels.add(leaderElection);
        }
    }
}
