package org.springframework.xd.dirt.server;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.cluster.Container;
import org.springframework.xd.dirt.cluster.ContainerMatcher;
import org.springframework.xd.dirt.cluster.ContainerRepository;
import org.springframework.xd.dirt.job.JobFactory;
import org.springframework.xd.dirt.module.ModuleDefinitionRepository;
import org.springframework.xd.dirt.stream.JobDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.util.MapBytesUtility;
import org.springframework.xd.dirt.zookeeper.ChildPathIterator;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.options.ModuleOptionsMetadataResolver;

/* loaded from: input_file:org/springframework/xd/dirt/server/DeploymentSupervisor.class */
public class DeploymentSupervisor implements ContainerRepository, ApplicationListener<ApplicationEvent>, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(DeploymentSupervisor.class);
    private final ZooKeeperConnection zkConnection;
    private final StreamDefinitionRepository streamDefinitionRepository;
    private final JobDefinitionRepository jobDefinitionRepository;
    private final ModuleDefinitionRepository moduleDefinitionRepository;
    private final ModuleOptionsMetadataResolver moduleOptionsMetadataResolver;
    private volatile ApplicationContext applicationContext;
    private final ContainerMatcher containerMatcher;
    private volatile LeaderSelector leaderSelector;
    private final AtomicReference<PathChildrenCache> containers = new AtomicReference<>();
    private final ContainerConverter containerConverter = new ContainerConverter();
    private final MapBytesUtility mapBytesUtility = new MapBytesUtility();
    private final LeaderSelectorListener leaderListener = new LeaderListener();
    private final ConnectionListener connectionListener = new ConnectionListener();

    /* loaded from: input_file:org/springframework/xd/dirt/server/DeploymentSupervisor$ConnectionListener.class */
    private class ConnectionListener implements ZooKeeperConnectionListener {
        private ConnectionListener() {
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onConnect(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} CONNECTED", DeploymentSupervisor.this.getId());
            DeploymentSupervisor.this.requestLeadership(curatorFramework);
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onDisconnect(CuratorFramework curatorFramework) {
            try {
                DeploymentSupervisor.this.destroy();
            } catch (Exception e) {
                DeploymentSupervisor.logger.warn("exception occurred while closing leader selector", e);
            }
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/server/DeploymentSupervisor$ContainerConverter.class */
    public class ContainerConverter implements Converter<ChildData, Container> {
        public ContainerConverter() {
        }

        public Container convert(ChildData childData) {
            return new Container(Paths.stripPath(childData.getPath()), DeploymentSupervisor.this.mapBytesUtility.toMap(childData.getData()));
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/server/DeploymentSupervisor$LeaderListener.class */
    class LeaderListener extends LeaderSelectorListenerAdapter {
        LeaderListener() {
        }

        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
            DeploymentSupervisor.logger.info("Leader Admin {} is watching for stream/job deployment requests.", DeploymentSupervisor.this.getId());
            cleanupDeployments(curatorFramework);
            PathChildrenCache pathChildrenCache = null;
            PathChildrenCache pathChildrenCache2 = null;
            try {
                try {
                    StreamFactory streamFactory = new StreamFactory(DeploymentSupervisor.this.streamDefinitionRepository, DeploymentSupervisor.this.moduleDefinitionRepository, DeploymentSupervisor.this.moduleOptionsMetadataResolver);
                    JobFactory jobFactory = new JobFactory(DeploymentSupervisor.this.jobDefinitionRepository, DeploymentSupervisor.this.moduleDefinitionRepository, DeploymentSupervisor.this.moduleOptionsMetadataResolver);
                    StreamDeploymentListener streamDeploymentListener = new StreamDeploymentListener(DeploymentSupervisor.this.zkConnection, DeploymentSupervisor.this, streamFactory, DeploymentSupervisor.this.containerMatcher);
                    pathChildrenCache = new PathChildrenCache(curatorFramework, Paths.STREAM_DEPLOYMENTS, true, ThreadUtils.newThreadFactory("StreamDeploymentsPathChildrenCache"));
                    pathChildrenCache.getListenable().addListener(streamDeploymentListener);
                    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                    JobDeploymentListener jobDeploymentListener = new JobDeploymentListener(DeploymentSupervisor.this.zkConnection, DeploymentSupervisor.this, jobFactory, DeploymentSupervisor.this.containerMatcher);
                    pathChildrenCache2 = new PathChildrenCache(curatorFramework, Paths.JOB_DEPLOYMENTS, true, ThreadUtils.newThreadFactory("JobDeploymentsPathChildrenCache"));
                    pathChildrenCache2.getListenable().addListener(jobDeploymentListener);
                    pathChildrenCache2.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                    ContainerListener containerListener = new ContainerListener(DeploymentSupervisor.this.zkConnection, DeploymentSupervisor.this, streamFactory, jobFactory, pathChildrenCache, pathChildrenCache2, DeploymentSupervisor.this.containerMatcher);
                    PathChildrenCache pathChildrenCache3 = new PathChildrenCache(curatorFramework, Paths.CONTAINERS, true, ThreadUtils.newThreadFactory("ContainersPathChildrenCache"));
                    pathChildrenCache3.getListenable().addListener(containerListener);
                    pathChildrenCache3.start();
                    DeploymentSupervisor.this.containers.set(pathChildrenCache3);
                    Thread.sleep(Long.MAX_VALUE);
                    PathChildrenCache pathChildrenCache4 = (PathChildrenCache) DeploymentSupervisor.this.containers.getAndSet(null);
                    if (pathChildrenCache4 != null) {
                        pathChildrenCache4.close();
                    }
                    if (pathChildrenCache != null) {
                        pathChildrenCache.close();
                    }
                    if (pathChildrenCache2 != null) {
                        pathChildrenCache2.close();
                    }
                } catch (InterruptedException e) {
                    DeploymentSupervisor.logger.info("Leadership canceled due to thread interrupt");
                    Thread.currentThread().interrupt();
                    PathChildrenCache pathChildrenCache5 = (PathChildrenCache) DeploymentSupervisor.this.containers.getAndSet(null);
                    if (pathChildrenCache5 != null) {
                        pathChildrenCache5.close();
                    }
                    if (pathChildrenCache != null) {
                        pathChildrenCache.close();
                    }
                    if (pathChildrenCache2 != null) {
                        pathChildrenCache2.close();
                    }
                }
            } catch (Throwable th) {
                PathChildrenCache pathChildrenCache6 = (PathChildrenCache) DeploymentSupervisor.this.containers.getAndSet(null);
                if (pathChildrenCache6 != null) {
                    pathChildrenCache6.close();
                }
                if (pathChildrenCache != null) {
                    pathChildrenCache.close();
                }
                if (pathChildrenCache2 != null) {
                    pathChildrenCache2.close();
                }
                throw th;
            }
        }

        private void cleanupDeployments(CuratorFramework curatorFramework) throws Exception {
            HashSet hashSet = new HashSet();
            try {
                hashSet.addAll((Collection) curatorFramework.getChildren().forPath(Paths.build(Paths.MODULE_DEPLOYMENTS)));
                hashSet.removeAll((Collection) curatorFramework.getChildren().forPath(Paths.build(Paths.CONTAINERS)));
            } catch (KeeperException.NoNodeException e) {
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                try {
                    curatorFramework.delete().deletingChildrenIfNeeded().forPath(Paths.build(Paths.MODULE_DEPLOYMENTS, (String) it.next()));
                } catch (KeeperException.NoNodeException e2) {
                }
            }
        }
    }

    public DeploymentSupervisor(ZooKeeperConnection zooKeeperConnection, StreamDefinitionRepository streamDefinitionRepository, JobDefinitionRepository jobDefinitionRepository, ModuleDefinitionRepository moduleDefinitionRepository, ModuleOptionsMetadataResolver moduleOptionsMetadataResolver, ContainerMatcher containerMatcher) {
        Assert.notNull(zooKeeperConnection, "ZooKeeperConnection must not be null");
        Assert.notNull(streamDefinitionRepository, "StreamDefinitionRepository must not be null");
        Assert.notNull(moduleDefinitionRepository, "ModuleDefinitionRepository must not be null");
        Assert.notNull(moduleOptionsMetadataResolver, "moduleOptionsMetadataResolver must not be null");
        Assert.notNull(containerMatcher, "containerMatcher must not be null");
        this.zkConnection = zooKeeperConnection;
        this.streamDefinitionRepository = streamDefinitionRepository;
        this.jobDefinitionRepository = jobDefinitionRepository;
        this.moduleDefinitionRepository = moduleDefinitionRepository;
        this.moduleOptionsMetadataResolver = moduleOptionsMetadataResolver;
        this.containerMatcher = containerMatcher;
    }

    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        if (applicationEvent instanceof ContextRefreshedEvent) {
            this.applicationContext = ((ContextRefreshedEvent) applicationEvent).getApplicationContext();
            if (this.zkConnection.isConnected()) {
                requestLeadership(this.zkConnection.getClient());
            }
            this.zkConnection.addListener(this.connectionListener);
            return;
        }
        if (!(applicationEvent instanceof ContextStoppedEvent) || this.leaderSelector == null) {
            return;
        }
        this.leaderSelector.close();
    }

    @Override // org.springframework.xd.dirt.cluster.ContainerRepository
    public Iterator<Container> getContainerIterator() {
        PathChildrenCache pathChildrenCache = this.containers.get();
        return pathChildrenCache == null ? Collections.emptyIterator() : new ChildPathIterator(this.containerConverter, pathChildrenCache);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getId() {
        return this.applicationContext.getId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void requestLeadership(CuratorFramework curatorFramework) {
        try {
            Paths.ensurePath(curatorFramework, Paths.MODULE_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.STREAM_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.JOB_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.CONTAINERS);
            Paths.ensurePath(curatorFramework, Paths.STREAMS);
            Paths.ensurePath(curatorFramework, Paths.JOBS);
            if (this.leaderSelector == null) {
                this.leaderSelector = new LeaderSelector(curatorFramework, Paths.build(Paths.ADMINS), this.leaderListener);
                this.leaderSelector.setId(getId());
                this.leaderSelector.start();
            }
        } catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
    }

    public void destroy() {
        if (this.leaderSelector != null) {
            this.leaderSelector.close();
            this.leaderSelector = null;
        }
    }
}
