package org.springframework.xd.dirt.server;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.validation.BindException;
import org.springframework.xd.dirt.container.ContainerAttributes;
import org.springframework.xd.dirt.container.store.ContainerAttributesRepository;
import org.springframework.xd.dirt.core.JobsPath;
import org.springframework.xd.dirt.core.ModuleDeploymentsPath;
import org.springframework.xd.dirt.core.ModuleDescriptor;
import org.springframework.xd.dirt.core.StreamsPath;
import org.springframework.xd.dirt.module.ModuleDefinitionRepository;
import org.springframework.xd.dirt.module.ModuleDeployer;
import org.springframework.xd.dirt.module.ModuleDeploymentRequest;
import org.springframework.xd.dirt.stream.ParsingContext;
import org.springframework.xd.dirt.stream.StreamDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.stream.XDParser;
import org.springframework.xd.dirt.stream.XDStreamParser;
import org.springframework.xd.dirt.util.MapBytesUtility;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.module.DeploymentMetadata;
import org.springframework.xd.module.ModuleDefinition;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.core.CompositeModule;
import org.springframework.xd.module.core.Module;
import org.springframework.xd.module.core.SimpleModule;
import org.springframework.xd.module.options.ModuleOptions;
import org.springframework.xd.module.options.ModuleOptionsMetadataResolver;
import org.springframework.xd.module.options.PrefixNarrowingModuleOptions;
import org.springframework.xd.module.support.ParentLastURLClassLoader;

/* loaded from: input_file:org/springframework/xd/dirt/server/ContainerRegistrar.class */
public class ContainerRegistrar implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware, BeanClassLoaderAware {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerRegistrar.class);
    private final ContainerAttributes containerAttributes;
    private final ContainerAttributesRepository containerAttributesRepository;
    private final ZooKeeperConnection zkConnection;
    private volatile PathChildrenCache deployments;
    private final ModuleDefinitionRepository moduleDefinitionRepository;
    private final ModuleOptionsMetadataResolver moduleOptionsMetadataResolver;
    private final StreamFactory streamFactory;
    private final ModuleDeployer moduleDeployer;
    private final XDParser parser;
    private volatile ApplicationContext context;
    private volatile ClassLoader parentClassLoader;
    private final DeploymentListener deploymentListener = new DeploymentListener();
    private final StreamModuleWatcher streamModuleWatcher = new StreamModuleWatcher();
    private final JobModuleWatcher jobModuleWatcher = new JobModuleWatcher();
    private final MapBytesUtility mapBytesUtility = new MapBytesUtility();
    private final Map<ModuleDescriptor.Key, ModuleDescriptor> mapDeployedModules = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.xd.dirt.server.ContainerRegistrar$1, reason: invalid class name */
    /* loaded from: input_file:org/springframework/xd/dirt/server/ContainerRegistrar$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.INITIALIZED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/server/ContainerRegistrar$ContainerAttributesRegisteringZooKeeperConnectionListener.class */
    public class ContainerAttributesRegisteringZooKeeperConnectionListener implements ZooKeeperConnectionListener {
        private ContainerAttributesRegisteringZooKeeperConnectionListener() {
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onConnect(CuratorFramework curatorFramework) {
            ContainerRegistrar.this.registerWithZooKeeper(curatorFramework);
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onDisconnect(CuratorFramework curatorFramework) {
            try {
                ContainerRegistrar.LOG.warn(">>> disconnected container: {}", ContainerRegistrar.this.containerAttributes.getId());
                ContainerRegistrar.this.deployments.getListenable().removeListener(ContainerRegistrar.this.deploymentListener);
                ContainerRegistrar.this.deployments.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /* synthetic */ ContainerAttributesRegisteringZooKeeperConnectionListener(ContainerRegistrar containerRegistrar, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/xd/dirt/server/ContainerRegistrar$DeploymentListener.class */
    public class DeploymentListener implements PathChildrenCacheListener {
        DeploymentListener() {
        }

        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            ContainerRegistrar.LOG.debug("Path cache event: {}", pathChildrenCacheEvent);
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case 1:
                default:
                    return;
                case 2:
                    ContainerRegistrar.this.onChildAdded(curatorFramework, pathChildrenCacheEvent.getData());
                    return;
                case 3:
                    ContainerRegistrar.this.onChildRemoved(curatorFramework, pathChildrenCacheEvent.getData());
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/xd/dirt/server/ContainerRegistrar$JobModuleWatcher.class */
    public class JobModuleWatcher implements CuratorWatcher {
        JobModuleWatcher() {
        }

        public void process(WatchedEvent watchedEvent) throws Exception {
            if (watchedEvent.getType() != Watcher.Event.EventType.NodeDeleted) {
                ((BackgroundPathable) ContainerRegistrar.this.zkConnection.getClient().getData().usingWatcher(this)).forPath(watchedEvent.getPath());
                return;
            }
            JobsPath jobsPath = new JobsPath(watchedEvent.getPath());
            String jobName = jobsPath.getJobName();
            String moduleLabel = jobsPath.getModuleLabel();
            ContainerRegistrar.this.undeployModule(jobName, ModuleType.job.toString(), moduleLabel);
            String build = new ModuleDeploymentsPath().setContainer(ContainerRegistrar.this.containerAttributes.getId()).setStreamName(jobName).setModuleType(ModuleType.job.toString()).setModuleLabel(moduleLabel).build();
            CuratorFramework client = ContainerRegistrar.this.zkConnection.getClient();
            try {
                if (client.checkExists().forPath(build) != null) {
                    ContainerRegistrar.LOG.trace("Deleting path: {}", build);
                    client.delete().deletingChildrenIfNeeded().forPath(build);
                }
            } catch (KeeperException e) {
                if (client.getState() == CuratorFrameworkState.STARTED) {
                    throw new RuntimeException((Throwable) e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/xd/dirt/server/ContainerRegistrar$StreamModuleWatcher.class */
    public class StreamModuleWatcher implements CuratorWatcher {
        StreamModuleWatcher() {
        }

        public void process(WatchedEvent watchedEvent) throws Exception {
            if (watchedEvent.getType() != Watcher.Event.EventType.NodeDeleted) {
                ((BackgroundPathable) ContainerRegistrar.this.zkConnection.getClient().getData().usingWatcher(this)).forPath(watchedEvent.getPath());
                return;
            }
            StreamsPath streamsPath = new StreamsPath(watchedEvent.getPath());
            String streamName = streamsPath.getStreamName();
            String moduleType = streamsPath.getModuleType();
            String moduleLabel = streamsPath.getModuleLabel();
            ContainerRegistrar.this.undeployModule(streamName, moduleType, moduleLabel);
            String build = new ModuleDeploymentsPath().setContainer(ContainerRegistrar.this.containerAttributes.getId()).setStreamName(streamName).setModuleType(moduleType).setModuleLabel(moduleLabel).build();
            CuratorFramework client = ContainerRegistrar.this.zkConnection.getClient();
            try {
                if (client.checkExists().forPath(build) != null) {
                    ContainerRegistrar.LOG.trace("Deleting path: {}", build);
                    client.delete().deletingChildrenIfNeeded().forPath(build);
                }
            } catch (Exception e) {
                if (client.getState() == CuratorFrameworkState.STARTED) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public ContainerRegistrar(ContainerAttributes containerAttributes, ContainerAttributesRepository containerAttributesRepository, StreamDefinitionRepository streamDefinitionRepository, ModuleDefinitionRepository moduleDefinitionRepository, ModuleOptionsMetadataResolver moduleOptionsMetadataResolver, ModuleDeployer moduleDeployer, ZooKeeperConnection zooKeeperConnection) {
        this.containerAttributes = containerAttributes;
        this.containerAttributesRepository = containerAttributesRepository;
        this.zkConnection = zooKeeperConnection;
        this.moduleDefinitionRepository = moduleDefinitionRepository;
        this.moduleOptionsMetadataResolver = moduleOptionsMetadataResolver;
        this.moduleDeployer = moduleDeployer;
        this.streamFactory = new StreamFactory(streamDefinitionRepository, moduleDefinitionRepository, moduleOptionsMetadataResolver);
        this.parser = new XDStreamParser(moduleDefinitionRepository, moduleOptionsMetadataResolver);
    }

    private Module deployModule(ModuleDescriptor moduleDescriptor) {
        LOG.info("Deploying module {}", moduleDescriptor);
        this.mapDeployedModules.put(moduleDescriptor.newKey(), moduleDescriptor);
        ModuleOptions safeModuleOptionsInterpolate = safeModuleOptionsInterpolate(moduleDescriptor);
        Module createComposedModule = moduleDescriptor.isComposed() ? createComposedModule(moduleDescriptor, safeModuleOptionsInterpolate) : createSimpleModule(moduleDescriptor, safeModuleOptionsInterpolate);
        this.moduleDeployer.deployAndStore(createComposedModule, moduleDescriptor);
        return createComposedModule;
    }

    protected void undeployModule(String str, String str2, String str3) {
        ModuleDescriptor.Key key = new ModuleDescriptor.Key(str, ModuleType.valueOf(str2), str3);
        ModuleDescriptor moduleDescriptor = this.mapDeployedModules.get(key);
        if (moduleDescriptor == null) {
            LOG.trace("Module {} already undeployed", str3);
            return;
        }
        LOG.info("Undeploying module {}", moduleDescriptor);
        this.mapDeployedModules.remove(key);
        this.moduleDeployer.undeploy(moduleDescriptor);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (this.context.equals(contextRefreshedEvent.getApplicationContext())) {
            if (this.zkConnection.isConnected()) {
                registerWithZooKeeper(this.zkConnection.getClient());
            }
            this.zkConnection.addListener(new ContainerAttributesRegisteringZooKeeperConnectionListener(this, null));
        }
    }

    public void setBeanClassLoader(ClassLoader classLoader) {
        this.parentClassLoader = classLoader;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerWithZooKeeper(CuratorFramework curatorFramework) {
        try {
            Paths.ensurePath(curatorFramework, Paths.MODULE_DEPLOYMENTS);
            this.deployments = new PathChildrenCache(curatorFramework, Paths.build(Paths.MODULE_DEPLOYMENTS, this.containerAttributes.getId()), true, ThreadUtils.newThreadFactory("DeploymentsPathChildrenCache"));
            this.deployments.getListenable().addListener(this.deploymentListener);
            this.containerAttributesRepository.save(this.containerAttributes);
            this.deployments.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            LOG.info("Started container {}", this.containerAttributes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onChildAdded(CuratorFramework curatorFramework, ChildData childData) {
        ModuleDeploymentsPath moduleDeploymentsPath = new ModuleDeploymentsPath(childData.getPath());
        String streamName = moduleDeploymentsPath.getStreamName();
        String moduleType = moduleDeploymentsPath.getModuleType();
        String moduleLabel = moduleDeploymentsPath.getModuleLabel();
        Module deployJob = ModuleType.job.toString().equals(moduleType) ? deployJob(curatorFramework, streamName, moduleLabel) : deployStreamModule(curatorFramework, streamName, moduleType, moduleLabel);
        if (deployJob != null) {
            HashMap hashMap = new HashMap();
            CollectionUtils.mergePropertiesIntoMap(deployJob.getProperties(), hashMap);
            try {
                ((ACLBackgroundPathAndBytesable) curatorFramework.create().withMode(CreateMode.EPHEMERAL)).forPath(childData.getPath() + "/metadata", this.mapBytesUtility.toByteArray(hashMap));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private Module deployJob(CuratorFramework curatorFramework, String str, String str2) {
        LOG.info("Deploying job '{}'", str);
        String build = new JobsPath().setJobName(str).setModuleLabel(str2).setContainer(this.containerAttributes.getId()).build();
        try {
            Map<String, String> map = this.mapBytesUtility.toMap((byte[]) curatorFramework.getData().forPath(Paths.build(Paths.JOBS, str)));
            ModuleDefinition findByNameAndType = this.moduleDefinitionRepository.findByNameAndType(str2.substring(0, str2.lastIndexOf(45)), ModuleType.job);
            ModuleDeploymentRequest moduleDeploymentRequest = this.parser.parse(str, map.get("definition"), ParsingContext.job).get(0);
            ModuleDescriptor moduleDescriptor = new ModuleDescriptor(findByNameAndType, moduleDeploymentRequest.getGroup(), str2, moduleDeploymentRequest.getIndex(), null);
            moduleDescriptor.addParameters(moduleDeploymentRequest.getParameters());
            Module deployModule = deployModule(moduleDescriptor);
            ((ACLBackgroundPathAndBytesable) curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(build, this.mapBytesUtility.toByteArray(Collections.singletonMap("state", "deployed")));
            ((BackgroundPathable) curatorFramework.getData().usingWatcher(this.jobModuleWatcher)).forPath(build);
            return deployModule;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Module deployStreamModule(CuratorFramework curatorFramework, String str, String str2, String str3) {
        LOG.info("Deploying module '{}' for stream '{}'", str3, str);
        String build = new StreamsPath().setStreamName(str).setModuleType(str2).setModuleLabel(str3).setContainer(this.containerAttributes.getId()).build();
        Module module = null;
        try {
            module = deployModule(this.streamFactory.createStream(str, this.mapBytesUtility.toMap((byte[]) curatorFramework.getData().forPath(Paths.build(Paths.STREAMS, str)))).getModuleDescriptor(str3, str2));
            ((ACLBackgroundPathAndBytesable) curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(build, this.mapBytesUtility.toByteArray(Collections.singletonMap("state", "deployed")));
            ((BackgroundPathable) curatorFramework.getData().usingWatcher(this.streamModuleWatcher)).forPath(build);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (KeeperException.NodeExistsException e2) {
            LOG.info("Module for stream {} already deployed", str3, str);
        } catch (Exception e3) {
            throw new RuntimeException(e3);
        }
        return module;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onChildRemoved(CuratorFramework curatorFramework, ChildData childData) throws Exception {
        ModuleDeploymentsPath moduleDeploymentsPath = new ModuleDeploymentsPath(childData.getPath());
        String streamName = moduleDeploymentsPath.getStreamName();
        String moduleType = moduleDeploymentsPath.getModuleType();
        String moduleLabel = moduleDeploymentsPath.getModuleLabel();
        undeployModule(streamName, moduleType, moduleLabel);
        String build = ModuleType.job.toString().equals(moduleType) ? new JobsPath().setJobName(streamName).setModuleLabel(moduleLabel).setContainer(this.containerAttributes.getId()).build() : new StreamsPath().setStreamName(streamName).setModuleType(moduleType).setModuleLabel(moduleLabel).setContainer(this.containerAttributes.getId()).build();
        if (curatorFramework.checkExists().forPath(build) != null) {
            LOG.trace("Deleting path: {}", build);
            curatorFramework.delete().forPath(build);
        }
    }

    private Module createComposedModule(ModuleDescriptor moduleDescriptor, ModuleOptions moduleOptions) {
        String streamName = moduleDescriptor.getStreamName();
        int index = moduleDescriptor.getIndex();
        String sourceChannelName = moduleDescriptor.getSourceChannelName();
        String sinkChannelName = moduleDescriptor.getSinkChannelName();
        ModuleDefinition moduleDefinition = moduleDescriptor.getModuleDefinition();
        List<ModuleDeploymentRequest> parse = this.parser.parse(moduleDefinition.getName(), moduleDefinition.getDefinition(), ParsingContext.module);
        Assert.notEmpty(parse, "child module list must not be empty");
        ArrayList arrayList = new ArrayList(parse.size());
        for (ModuleDeploymentRequest moduleDeploymentRequest : parse) {
            arrayList.add(0, createSimpleModule(new ModuleDescriptor(this.moduleDefinitionRepository.findByNameAndType(moduleDeploymentRequest.getModule(), moduleDeploymentRequest.getType()), moduleDeploymentRequest.getGroup(), "", moduleDeploymentRequest.getIndex(), moduleDescriptor.getDeploymentProperties()), new PrefixNarrowingModuleOptions(moduleOptions, moduleDeploymentRequest.getModule())));
        }
        return new CompositeModule(moduleDefinition.getName(), moduleDefinition.getType(), arrayList, new DeploymentMetadata(streamName, index, sourceChannelName, sinkChannelName));
    }

    private Module createSimpleModule(ModuleDescriptor moduleDescriptor, ModuleOptions moduleOptions) {
        DeploymentMetadata deploymentMetadata = new DeploymentMetadata(moduleDescriptor.getStreamName(), moduleDescriptor.getIndex(), moduleDescriptor.getSourceChannelName(), moduleDescriptor.getSinkChannelName());
        ModuleDefinition moduleDefinition = moduleDescriptor.getModuleDefinition();
        return new SimpleModule(moduleDefinition, deploymentMetadata, moduleDefinition.getClasspath() == null ? null : new ParentLastURLClassLoader(moduleDefinition.getClasspath(), this.parentClassLoader), moduleOptions);
    }

    private ModuleOptions safeModuleOptionsInterpolate(ModuleDescriptor moduleDescriptor) {
        try {
            return this.moduleOptionsMetadataResolver.resolve(moduleDescriptor.getModuleDefinition()).interpolate(moduleDescriptor.getParameters());
        } catch (BindException e) {
            throw new IllegalStateException((Throwable) e);
        }
    }
}
