package ml.shifu.guagua.master;

import java.util.List;
import java.util.concurrent.TimeUnit;
import ml.shifu.guagua.BasicCoordinator;
import ml.shifu.guagua.GuaguaConstants;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.master.AbstractMasterCoordinator;
import ml.shifu.guagua.util.NumberFormatUtils;
import ml.shifu.guagua.util.ProgressLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/shifu/guagua/master/AsyncMasterCoordinator.class */
public class AsyncMasterCoordinator<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable> extends AbstractMasterCoordinator<MASTER_RESULT, WORKER_RESULT> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncMasterCoordinator.class);
    private int currentIteration;
    private String appId;
    protected ProgressLock workerInitLock = new ProgressLock();
    protected ProgressLock workerIterationLock = new ProgressLock();

    @Override // ml.shifu.guagua.BasicCoordinator
    public void process(WatchedEvent watchedEvent) {
        LOG.debug("DEBUG: process: Got a new event, path = {}, type = {}, state = {}", new Object[]{watchedEvent.getPath(), watchedEvent.getType(), watchedEvent.getState()});
        if (watchedEvent.getPath() == null && watchedEvent.getType() == Watcher.Event.EventType.None) {
            if (watchedEvent.getState() != Watcher.Event.KeeperState.SyncConnected) {
                LOG.warn("process: Got unknown null path event " + watchedEvent);
                return;
            } else {
                LOG.info("process: Asynchronous connection complete.");
                super.getZkConnLatch().countDown();
                return;
            }
        }
        if (watchedEvent.getPath().equals(getWorkerBaseNode(getAppId(), getCurrentIteration()).toString()) && watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
            if (getCurrentIteration() == 0) {
                this.workerInitLock.signal();
            } else {
                this.workerIterationLock.signal();
            }
        }
    }

    public int getCurrentIteration() {
        return this.currentIteration;
    }

    public void setCurrentIteration(int i) {
        this.currentIteration = i;
    }

    public String getAppId() {
        return this.appId;
    }

    public void setAppId(String str) {
        this.appId = str;
    }

    @Override // ml.shifu.guagua.master.MasterInterceptor
    public void preApplication(final MasterContext<MASTER_RESULT, WORKER_RESULT> masterContext) {
        initialize(masterContext.getProps());
        setAppId(masterContext.getAppId());
        if (NumberFormatUtils.getInt(masterContext.getProps().getProperty(GuaguaConstants.GUAGUA_MASTER_NUMBER), 1) > 1) {
            new AbstractMasterCoordinator.MasterElectionCommand(this, masterContext.getAppId()).execute();
        }
        new AbstractMasterCoordinator.FailOverCommand(this, masterContext).execute();
        if (masterContext.getCurrentIteration() != 0) {
            return;
        }
        new BasicCoordinator.BasicCoordinatorCommand() { // from class: ml.shifu.guagua.master.AsyncMasterCoordinator.1
            @Override // ml.shifu.guagua.BasicCoordinator.BasicCoordinatorCommand
            public void doExecute() throws KeeperException, InterruptedException {
                final String sb = AsyncMasterCoordinator.this.getWorkerBaseNode(masterContext.getAppId(), masterContext.getCurrentIteration()).toString();
                new BasicCoordinator.RetryCoordinatorCommand(AsyncMasterCoordinator.this.isFixedTime(), AsyncMasterCoordinator.this.getSleepTime()) { // from class: ml.shifu.guagua.master.AsyncMasterCoordinator.1.1
                    @Override // ml.shifu.guagua.BasicCoordinator.RetryCoordinatorCommand
                    public boolean retryExecution() throws KeeperException, InterruptedException {
                        try {
                            List<String> childrenExt = AsyncMasterCoordinator.this.getZooKeeper().getChildrenExt(sb, false, false, false);
                            if (isTerminated(childrenExt == null ? 0 : childrenExt.size(), masterContext.getWorkers(), masterContext.getMinWorkersRatio(), masterContext.getMinWorkersTimeOut())) {
                                return true;
                            }
                            List<String> childrenExt2 = AsyncMasterCoordinator.this.getZooKeeper().getChildrenExt(sb, true, false, false);
                            int size = childrenExt2 == null ? 0 : childrenExt2.size();
                            if (isTerminated(size, masterContext.getWorkers(), masterContext.getMinWorkersRatio(), masterContext.getMinWorkersTimeOut())) {
                                return true;
                            }
                            if (System.nanoTime() % 20 == 0) {
                                AsyncMasterCoordinator.LOG.info("workers already initialized: {}, still {} workers are not synced.", Integer.valueOf(size), Integer.valueOf(masterContext.getWorkers() - size));
                            }
                            AsyncMasterCoordinator.this.workerInitLock.waitForever();
                            AsyncMasterCoordinator.this.workerInitLock.reset();
                            return false;
                        } catch (KeeperException.NoNodeException e) {
                            if (System.nanoTime() % 10 != 0) {
                                return false;
                            }
                            AsyncMasterCoordinator.LOG.warn("No such node:{}", sb);
                            return false;
                        }
                    }
                }.execute();
                AsyncMasterCoordinator.LOG.info("All workers are initiliazed successfully.");
                String str = null;
                try {
                    AsyncMasterCoordinator.this.getZooKeeper().createExt(AsyncMasterCoordinator.this.getWorkerBaseNode(masterContext.getAppId(), masterContext.getCurrentIteration() + 1).toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                    AsyncMasterCoordinator.this.getZooKeeper().createExt(AsyncMasterCoordinator.this.getMasterBaseNode(masterContext.getAppId()).toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                    str = AsyncMasterCoordinator.this.getCurrentMasterNode(masterContext.getAppId(), masterContext.getCurrentIteration()).toString();
                    AsyncMasterCoordinator.this.getZooKeeper().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                } catch (KeeperException.NodeExistsException e) {
                    AsyncMasterCoordinator.LOG.warn("Node exists: {}", str);
                }
            }
        }.execute();
    }

    @Override // ml.shifu.guagua.master.MasterInterceptor
    public void preIteration(final MasterContext<MASTER_RESULT, WORKER_RESULT> masterContext) {
        setCurrentIteration(masterContext.getCurrentIteration());
        new BasicCoordinator.BasicCoordinatorCommand() { // from class: ml.shifu.guagua.master.AsyncMasterCoordinator.2
            @Override // ml.shifu.guagua.BasicCoordinator.BasicCoordinatorCommand
            public void doExecute() throws KeeperException, InterruptedException {
                final int currentIteration = masterContext.getCurrentIteration();
                final int workers = masterContext.getWorkers();
                final String sb = AsyncMasterCoordinator.this.getWorkerBaseNode(masterContext.getAppId(), currentIteration).toString();
                long nanoTime = System.nanoTime();
                new BasicCoordinator.RetryCoordinatorCommand(AsyncMasterCoordinator.this.isFixedTime(), AsyncMasterCoordinator.this.getSleepTime()) { // from class: ml.shifu.guagua.master.AsyncMasterCoordinator.2.1
                    @Override // ml.shifu.guagua.BasicCoordinator.RetryCoordinatorCommand
                    public boolean retryExecution() throws KeeperException, InterruptedException {
                        try {
                            List<String> childrenExt = AsyncMasterCoordinator.this.getZooKeeper().getChildrenExt(sb, false, false, false);
                            if (isTerminated(childrenExt == null ? 0 : childrenExt.size(), masterContext.getWorkers(), masterContext.getMinWorkersRatio(), masterContext.getMinWorkersTimeOut())) {
                                return true;
                            }
                            List<String> childrenExt2 = AsyncMasterCoordinator.this.getZooKeeper().getChildrenExt(sb, true, false, false);
                            int size = childrenExt2 == null ? 0 : childrenExt2.size();
                            if (isTerminated(size, masterContext.getWorkers(), masterContext.getMinWorkersRatio(), masterContext.getMinWorkersTimeOut())) {
                                return true;
                            }
                            if (System.nanoTime() % 20 == 0) {
                                AsyncMasterCoordinator.LOG.info("iteration {}, workers compelted: {}, still {} workers are not synced.", new Object[]{Integer.valueOf(currentIteration), Integer.valueOf(size), Integer.valueOf(workers - size)});
                            }
                            AsyncMasterCoordinator.this.workerIterationLock.waitForever();
                            AsyncMasterCoordinator.this.workerIterationLock.reset();
                            return false;
                        } catch (KeeperException.NoNodeException e) {
                            if (System.nanoTime() % 10 != 0) {
                                return false;
                            }
                            AsyncMasterCoordinator.LOG.warn("No such node:{}", sb);
                            return false;
                        }
                    }
                }.execute();
                AsyncMasterCoordinator.LOG.info("Application {} container {} iteration {} waiting ends with {}ms execution time.", new Object[]{masterContext.getAppId(), masterContext.getContainerId(), Integer.valueOf(masterContext.getCurrentIteration()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
                AsyncMasterCoordinator.this.setWorkerResults(masterContext, sb, masterContext.getAppId(), currentIteration);
            }
        }.execute();
    }
}
