package ml.shifu.guagua.worker;

import java.util.concurrent.TimeUnit;
import ml.shifu.guagua.BasicCoordinator;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.util.ProgressLock;
import ml.shifu.guagua.worker.AbstractWorkerCoordinator;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/shifu/guagua/worker/AsyncWorkerCoordinator.class */
public class AsyncWorkerCoordinator<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable> extends AbstractWorkerCoordinator<MASTER_RESULT, WORKER_RESULT> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncWorkerCoordinator.class);
    private int currentIteration;
    private String appId;
    protected ProgressLock masterInitLock = new ProgressLock();
    protected ProgressLock masterIterationLock = new ProgressLock();

    @Override // ml.shifu.guagua.BasicCoordinator
    public void process(WatchedEvent watchedEvent) {
        LOG.info("DEBUG: process: Got a new event, path = {}, type = {}, state = {}", new Object[]{watchedEvent.getPath(), watchedEvent.getType(), watchedEvent.getState()});
        if (watchedEvent.getPath() == null && watchedEvent.getType() == Watcher.Event.EventType.None) {
            if (watchedEvent.getState() != Watcher.Event.KeeperState.SyncConnected) {
                LOG.warn("process: Got unknown null path event " + watchedEvent);
                return;
            } else {
                LOG.info("process: Asynchronous connection complete.");
                super.getZkConnLatch().countDown();
                return;
            }
        }
        if (watchedEvent.getPath().equals(getCurrentMasterNode(getAppId(), getCurrentIteration()).toString())) {
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeCreated || watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                if (getCurrentIteration() == 0) {
                    this.masterInitLock.signal();
                } else {
                    this.masterIterationLock.signal();
                }
            }
        }
    }

    public int getCurrentIteration() {
        return this.currentIteration;
    }

    public void setCurrentIteration(int i) {
        this.currentIteration = i;
    }

    public String getAppId() {
        return this.appId;
    }

    public void setAppId(String str) {
        this.appId = str;
    }

    @Override // ml.shifu.guagua.worker.WorkerInterceptor
    public void preApplication(final WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext) {
        initialize(workerContext.getProps());
        setAppId(workerContext.getAppId());
        new AbstractWorkerCoordinator.FailOverCoordinatorCommand(workerContext).execute();
        new BasicCoordinator.BasicCoordinatorCommand() { // from class: ml.shifu.guagua.worker.AsyncWorkerCoordinator.1
            @Override // ml.shifu.guagua.BasicCoordinator.BasicCoordinatorCommand
            public void doExecute() throws KeeperException, InterruptedException {
                String appId = workerContext.getAppId();
                int currentIteration = workerContext.getCurrentIteration();
                String containerId = workerContext.getContainerId();
                String sb = AsyncWorkerCoordinator.this.getCurrentMasterNode(appId, currentIteration).toString();
                String str = null;
                try {
                    str = AsyncWorkerCoordinator.this.getRootNode().toString();
                    if (AsyncWorkerCoordinator.this.getZooKeeper().exists(str, false) == null) {
                        AsyncWorkerCoordinator.this.getZooKeeper().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                    }
                } catch (KeeperException.NodeExistsException e) {
                    AsyncWorkerCoordinator.LOG.warn("Has such node:{}", str);
                }
                try {
                    str = AsyncWorkerCoordinator.this.getAppNode(appId).toString();
                    if (AsyncWorkerCoordinator.this.getZooKeeper().exists(str, false) == null) {
                        AsyncWorkerCoordinator.this.getZooKeeper().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                    }
                } catch (KeeperException.NodeExistsException e2) {
                    AsyncWorkerCoordinator.LOG.warn("Has such node:{}", str);
                }
                try {
                    str = AsyncWorkerCoordinator.this.getWorkerBaseNode(appId).toString();
                    if (AsyncWorkerCoordinator.this.getZooKeeper().exists(str, false) == null) {
                        AsyncWorkerCoordinator.this.getZooKeeper().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                    }
                } catch (KeeperException.NodeExistsException e3) {
                    AsyncWorkerCoordinator.LOG.warn("Has such node:{}", str);
                }
                try {
                    str = AsyncWorkerCoordinator.this.getWorkerBaseNode(appId, currentIteration).toString();
                    if (AsyncWorkerCoordinator.this.getZooKeeper().exists(str, false) == null) {
                        AsyncWorkerCoordinator.this.getZooKeeper().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                    }
                } catch (KeeperException.NodeExistsException e4) {
                    AsyncWorkerCoordinator.LOG.warn("Has such node:{}", str);
                }
                try {
                    str = AsyncWorkerCoordinator.this.getCurrentWorkerNode(appId, containerId, currentIteration).toString();
                    AsyncWorkerCoordinator.this.getZooKeeper().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                } catch (KeeperException.NodeExistsException e5) {
                    AsyncWorkerCoordinator.LOG.warn("Has such node:{}", str);
                }
                if (AsyncWorkerCoordinator.this.getZooKeeper().exists(sb, true) == null) {
                    AsyncWorkerCoordinator.LOG.info("DEBUG: wait for {}.", sb);
                    AsyncWorkerCoordinator.this.masterInitLock.waitForever();
                    AsyncWorkerCoordinator.this.masterInitLock.reset();
                }
                if (workerContext.getCurrentIteration() != 0) {
                    AsyncWorkerCoordinator.this.setMasterResult(workerContext, sb, AsyncWorkerCoordinator.this.getCurrentMasterSplitNode(appId, currentIteration).toString());
                }
                AsyncWorkerCoordinator.LOG.info("Master initilization is done.");
            }
        }.execute();
    }

    @Override // ml.shifu.guagua.worker.AbstractWorkerCoordinator, ml.shifu.guagua.worker.WorkerInterceptor
    public void preIteration(WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext) {
        setCurrentIteration(workerContext.getCurrentIteration());
        LOG.info("Start itertion {} with container id {} and app id {}.", new Object[]{Integer.valueOf(workerContext.getCurrentIteration()), workerContext.getContainerId(), workerContext.getAppId()});
    }

    @Override // ml.shifu.guagua.worker.WorkerInterceptor
    public void postIteration(final WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext) {
        new BasicCoordinator.BasicCoordinatorCommand() { // from class: ml.shifu.guagua.worker.AsyncWorkerCoordinator.2
            @Override // ml.shifu.guagua.BasicCoordinator.BasicCoordinatorCommand
            public void doExecute() throws KeeperException, InterruptedException {
                String appId = workerContext.getAppId();
                int currentIteration = workerContext.getCurrentIteration();
                String containerId = workerContext.getContainerId();
                String sb = AsyncWorkerCoordinator.this.getCurrentMasterNode(appId, currentIteration).toString();
                String sb2 = AsyncWorkerCoordinator.this.getCurrentWorkerNode(appId, containerId, currentIteration).toString();
                boolean z = false;
                try {
                    z = AsyncWorkerCoordinator.this.setBytesToZNode(sb2, AsyncWorkerCoordinator.this.getCurrentWorkerSplitNode(appId, containerId, currentIteration).toString(), AsyncWorkerCoordinator.this.getWorkerSerializer().objectToBytes(workerContext.getWorkerResult()), CreateMode.PERSISTENT);
                } catch (KeeperException.NodeExistsException e) {
                    AsyncWorkerCoordinator.LOG.warn("Has such node:{}", sb2);
                }
                if (workerContext.getCurrentIteration() >= 1) {
                    String sb3 = AsyncWorkerCoordinator.this.getWorkerNode(appId, containerId, currentIteration - 1).toString();
                    try {
                        AsyncWorkerCoordinator.this.getZooKeeper().deleteExt(sb3, -1, false);
                        if (z) {
                            sb3 = AsyncWorkerCoordinator.this.getCurrentWorkerSplitNode(appId, containerId, currentIteration - 1).toString();
                            AsyncWorkerCoordinator.this.getZooKeeper().deleteExt(sb3, -1, true);
                        }
                    } catch (KeeperException.NoNodeException e2) {
                        if (System.nanoTime() % 20 == 0) {
                            AsyncWorkerCoordinator.LOG.warn("No such node:{}", sb3);
                        }
                    }
                }
                AsyncWorkerCoordinator.LOG.info("wait to check master:{}", sb);
                long nanoTime = System.nanoTime();
                if (AsyncWorkerCoordinator.this.getZooKeeper().exists(sb, true) == null) {
                    AsyncWorkerCoordinator.this.masterIterationLock.waitForever();
                    AsyncWorkerCoordinator.this.masterIterationLock.reset();
                }
                AsyncWorkerCoordinator.LOG.info("Application {} container {} iteration {} waiting ends with {}ms execution time.", new Object[]{workerContext.getAppId(), workerContext.getContainerId(), Integer.valueOf(workerContext.getCurrentIteration()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
                AsyncWorkerCoordinator.this.setMasterResult(workerContext, sb, AsyncWorkerCoordinator.this.getCurrentMasterSplitNode(appId, currentIteration).toString());
                AsyncWorkerCoordinator.LOG.info("Master computation is done.");
            }
        }.execute();
    }
}
