package org.springframework.xd.dirt.server.admin.deployment.zk;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent;
import org.springframework.boot.context.embedded.EmbeddedWebApplicationContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.cluster.Admin;
import org.springframework.xd.dirt.cluster.AdminAttributes;
import org.springframework.xd.dirt.container.store.AdminRepository;
import org.springframework.xd.dirt.container.store.ContainerRepository;
import org.springframework.xd.dirt.job.JobFactory;
import org.springframework.xd.dirt.server.admin.deployment.ContainerMatcher;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentUnitStateCalculator;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;

/* loaded from: input_file:org/springframework/xd/dirt/server/admin/deployment/zk/DeploymentSupervisor.class */
public class DeploymentSupervisor implements ApplicationListener<ApplicationEvent>, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(DeploymentSupervisor.class);

    @Autowired
    private ZooKeeperConnection zkConnection;

    @Autowired
    private AdminRepository adminRepository;

    @Autowired
    private DeploymentMessageConsumer deploymentMessageConsumer;

    @Autowired
    private StreamFactory streamFactory;

    @Autowired
    private JobFactory jobFactory;

    @Autowired
    private ContainerMatcher containerMatcher;

    @Autowired
    private ContainerRepository containerRepository;

    @Autowired
    private ModuleDeploymentWriter moduleDeploymentWriter;

    @Autowired
    private DeploymentUnitStateCalculator stateCalculator;
    private final AdminAttributes adminAttributes;
    private volatile ApplicationContext applicationContext;
    private volatile LeaderSelector leaderSelector;
    private static final String MGMT_CONTEXT_NAMESPACE = "management";
    public static final String QUIET_PERIOD_PROPERTY = "xd.admin.quietPeriod";
    private volatile DeploymentQueue deploymentQueueForConsumer = null;
    private final LeaderSelectorListener leaderListener = new LeaderListener();
    private final ConnectionListener connectionListener = new ConnectionListener();
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("DeploymentSupervisor"));
    private final AtomicLong quietPeriod = new AtomicLong(15000);

    /* loaded from: input_file:org/springframework/xd/dirt/server/admin/deployment/zk/DeploymentSupervisor$ConnectionListener.class */
    private class ConnectionListener implements ZooKeeperConnectionListener {
        private ConnectionListener() {
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onConnect(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection established", DeploymentSupervisor.this.getId());
            DeploymentSupervisor.this.registerWithZooKeeper(curatorFramework);
            DeploymentSupervisor.this.requestLeadership(curatorFramework);
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onResume(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection resumed, client state: {}", DeploymentSupervisor.this.getId(), curatorFramework.getState());
            DeploymentSupervisor.this.registerWithZooKeeper(curatorFramework);
            DeploymentSupervisor.this.requestLeadership(curatorFramework);
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onDisconnect(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection terminated", DeploymentSupervisor.this.getId());
            try {
                DeploymentSupervisor.this.destroy();
            } catch (Exception e) {
                DeploymentSupervisor.logger.warn("exception occurred while closing leader selector", e);
            }
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onSuspend(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection suspended", DeploymentSupervisor.this.getId());
            try {
                DeploymentSupervisor.this.destroy();
            } catch (Exception e) {
                DeploymentSupervisor.logger.warn("exception occurred while closing leader selector", e);
            }
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/server/admin/deployment/zk/DeploymentSupervisor$LeaderListener.class */
    class LeaderListener extends LeaderSelectorListenerAdapter {
        LeaderListener() {
        }

        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
            DeploymentSupervisor.logger.info("Leader Admin {} is watching for stream/job deployment requests.", DeploymentSupervisor.this.getId());
            cleanupDeployments(curatorFramework);
            PathChildrenCache pathChildrenCache = null;
            PathChildrenCache pathChildrenCache2 = null;
            PathChildrenCache pathChildrenCache3 = null;
            PathChildrenCache pathChildrenCache4 = null;
            try {
                try {
                    String build = Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.REQUESTED);
                    Paths.ensurePath(curatorFramework, build);
                    Paths.ensurePath(curatorFramework, Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.ALLOCATED));
                    pathChildrenCache4 = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, build);
                    pathChildrenCache4.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                    pathChildrenCache2 = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, Paths.STREAM_DEPLOYMENTS);
                    pathChildrenCache2.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                    pathChildrenCache3 = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, Paths.JOB_DEPLOYMENTS);
                    pathChildrenCache3.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                    SupervisorElectedEvent supervisorElectedEvent = new SupervisorElectedEvent(pathChildrenCache4, pathChildrenCache2, pathChildrenCache3);
                    Iterator it = DeploymentSupervisor.this.applicationContext.getBeansOfType(SupervisorElectionListener.class).entrySet().iterator();
                    while (it.hasNext()) {
                        ((SupervisorElectionListener) ((Map.Entry) it.next()).getValue()).onSupervisorElected(supervisorElectedEvent);
                    }
                    ContainerListener containerListener = new ContainerListener(DeploymentSupervisor.this.zkConnection, DeploymentSupervisor.this.containerRepository, DeploymentSupervisor.this.streamFactory, DeploymentSupervisor.this.jobFactory, pathChildrenCache2, pathChildrenCache3, pathChildrenCache4, DeploymentSupervisor.this.containerMatcher, DeploymentSupervisor.this.moduleDeploymentWriter, DeploymentSupervisor.this.stateCalculator, DeploymentSupervisor.this.executorService, DeploymentSupervisor.this.quietPeriod);
                    pathChildrenCache = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, Paths.CONTAINERS);
                    pathChildrenCache.getListenable().addListener(containerListener);
                    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                    DeploymentSupervisor.this.deploymentQueueForConsumer = new DeploymentQueue(curatorFramework, DeploymentSupervisor.this.deploymentMessageConsumer, Paths.DEPLOYMENT_QUEUE, DeploymentSupervisor.this.executorService);
                    DeploymentSupervisor.this.deploymentQueueForConsumer.start();
                    Thread.sleep(Long.MAX_VALUE);
                    if (pathChildrenCache != null) {
                        pathChildrenCache.close();
                    }
                    if (pathChildrenCache2 != null) {
                        pathChildrenCache2.close();
                    }
                    if (pathChildrenCache3 != null) {
                        pathChildrenCache3.close();
                    }
                    if (pathChildrenCache4 != null) {
                        pathChildrenCache4.close();
                    }
                    if (DeploymentSupervisor.this.deploymentQueueForConsumer != null) {
                        try {
                            DeploymentSupervisor.this.deploymentQueueForConsumer.destroy();
                        } catch (IOException e) {
                            DeploymentSupervisor.logger.warn("Exception closing the distributed queue producer " + e);
                        }
                    }
                } catch (Throwable th) {
                    if (pathChildrenCache != null) {
                        pathChildrenCache.close();
                    }
                    if (pathChildrenCache2 != null) {
                        pathChildrenCache2.close();
                    }
                    if (pathChildrenCache3 != null) {
                        pathChildrenCache3.close();
                    }
                    if (pathChildrenCache4 != null) {
                        pathChildrenCache4.close();
                    }
                    if (DeploymentSupervisor.this.deploymentQueueForConsumer != null) {
                        try {
                            DeploymentSupervisor.this.deploymentQueueForConsumer.destroy();
                        } catch (IOException e2) {
                            DeploymentSupervisor.logger.warn("Exception closing the distributed queue producer " + e2);
                        }
                    }
                    throw th;
                }
            } catch (InterruptedException e3) {
                DeploymentSupervisor.logger.info("Leadership canceled due to thread interrupt");
                Thread.currentThread().interrupt();
                if (pathChildrenCache != null) {
                    pathChildrenCache.close();
                }
                if (pathChildrenCache2 != null) {
                    pathChildrenCache2.close();
                }
                if (pathChildrenCache3 != null) {
                    pathChildrenCache3.close();
                }
                if (pathChildrenCache4 != null) {
                    pathChildrenCache4.close();
                }
                if (DeploymentSupervisor.this.deploymentQueueForConsumer != null) {
                    try {
                        DeploymentSupervisor.this.deploymentQueueForConsumer.destroy();
                    } catch (IOException e4) {
                        DeploymentSupervisor.logger.warn("Exception closing the distributed queue producer " + e4);
                    }
                }
            }
        }

        private void cleanupDeployments(CuratorFramework curatorFramework) throws Exception {
            HashSet hashSet = new HashSet();
            try {
                hashSet.addAll((Collection) curatorFramework.getChildren().forPath(Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.ALLOCATED)));
                hashSet.removeAll((Collection) curatorFramework.getChildren().forPath(Paths.build(Paths.CONTAINERS)));
            } catch (KeeperException.NoNodeException e) {
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                try {
                    curatorFramework.delete().deletingChildrenIfNeeded().forPath(Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.ALLOCATED, (String) it.next()));
                } catch (KeeperException.NoNodeException e2) {
                }
            }
        }
    }

    public DeploymentSupervisor(AdminAttributes adminAttributes) {
        this.adminAttributes = adminAttributes;
    }

    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        if (applicationEvent instanceof ContextRefreshedEvent) {
            if (MGMT_CONTEXT_NAMESPACE.equals(((EmbeddedWebApplicationContext) applicationEvent.getSource()).getNamespace())) {
                return;
            }
            this.applicationContext = ((ContextRefreshedEvent) applicationEvent).getApplicationContext();
            String property = this.applicationContext.getEnvironment().getProperty(QUIET_PERIOD_PROPERTY);
            if (StringUtils.hasText(property)) {
                this.quietPeriod.set(Long.parseLong(property));
                logger.info("Set container quiet period to {} ms", property);
            }
            if (this.zkConnection.isConnected()) {
                registerWithZooKeeper(this.zkConnection.getClient());
                requestLeadership(this.zkConnection.getClient());
            }
            this.zkConnection.addListener(this.connectionListener);
            return;
        }
        if (applicationEvent instanceof ContextStoppedEvent) {
            if (this.leaderSelector != null) {
                this.leaderSelector.close();
            }
        } else if (applicationEvent instanceof EmbeddedServletContainerInitializedEvent) {
            String namespace = ((EmbeddedServletContainerInitializedEvent) applicationEvent).getApplicationContext().getNamespace();
            int port = ((EmbeddedServletContainerInitializedEvent) applicationEvent).getEmbeddedServletContainer().getPort();
            synchronized (this.adminAttributes) {
                if (MGMT_CONTEXT_NAMESPACE.equals(namespace)) {
                    this.adminAttributes.setManagementPort(Integer.valueOf(port));
                } else {
                    this.adminAttributes.setPort(Integer.valueOf(port));
                }
                if (this.zkConnection.isConnected() && this.adminRepository.exists(this.adminAttributes.getId())) {
                    this.adminRepository.update(new Admin(this.adminAttributes.getId(), this.adminAttributes));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getId() {
        return this.applicationContext.getId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void requestLeadership(CuratorFramework curatorFramework) {
        try {
            Paths.ensurePath(curatorFramework, Paths.MODULE_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.STREAM_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.JOB_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.ADMINS);
            Paths.ensurePath(curatorFramework, Paths.CONTAINERS);
            Paths.ensurePath(curatorFramework, Paths.STREAMS);
            Paths.ensurePath(curatorFramework, Paths.JOBS);
            if (this.leaderSelector == null) {
                this.leaderSelector = new LeaderSelector(curatorFramework, Paths.build(Paths.ADMINELECTION), this.leaderListener);
                this.leaderSelector.setId(getId());
                this.leaderSelector.start();
            }
        } catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
    }

    public void destroy() {
        if (this.leaderSelector != null) {
            this.leaderSelector.close();
            this.leaderSelector = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PathChildrenCache instantiatePathChildrenCache(CuratorFramework curatorFramework, String str) {
        return new PathChildrenCache(curatorFramework, str, true, false, this.executorService);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerWithZooKeeper(CuratorFramework curatorFramework) {
        try {
            String id = this.adminAttributes.getId();
            String build = Paths.build(Paths.ADMINS, id);
            Stat stat = (Stat) curatorFramework.checkExists().forPath(build);
            if (stat != null) {
                long ephemeralOwner = stat.getEphemeralOwner();
                long sessionId = curatorFramework.getZookeeperClient().getZooKeeper().getSessionId();
                if (ephemeralOwner == sessionId) {
                    logger.info(String.format("Existing registration for admin runtime %s with session 0x%x detected", id, Long.valueOf(sessionId)));
                    return;
                }
                logger.info(String.format("Trying to delete previous registration for admin runtime %s with session %x detected; current session: 0x%x; path: %s", id, Long.valueOf(ephemeralOwner), Long.valueOf(sessionId), build));
                try {
                    curatorFramework.delete().forPath(build);
                } catch (Exception e) {
                    ZooKeeperUtils.wrapAndThrowIgnoring(e, KeeperException.NoNodeException.class);
                }
            }
            synchronized (this.adminAttributes) {
                this.adminRepository.save(new Admin(id, this.adminAttributes));
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw ZooKeeperUtils.wrapThrowable(e2);
        } catch (Exception e3) {
            throw ZooKeeperUtils.wrapThrowable(e3);
        }
    }
}
