package org.springframework.xd.dirt.server;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.cluster.Container;
import org.springframework.xd.dirt.cluster.ContainerMatcher;
import org.springframework.xd.dirt.cluster.ContainerRepository;
import org.springframework.xd.dirt.cluster.NoContainerException;
import org.springframework.xd.dirt.core.Stream;
import org.springframework.xd.dirt.core.StreamDeploymentsPath;
import org.springframework.xd.dirt.server.ModuleDeploymentWriter;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.util.DeploymentPropertiesUtility;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.ModuleDeploymentProperties;
import org.springframework.xd.module.ModuleDescriptor;

/* loaded from: input_file:org/springframework/xd/dirt/server/StreamDeploymentListener.class */
public class StreamDeploymentListener implements PathChildrenCacheListener {
    private final ModuleDeploymentWriter moduleDeploymentWriter;
    private final StreamFactory streamFactory;
    private final Logger logger = LoggerFactory.getLogger(StreamDeploymentListener.class);
    private final DeploymentLoader deploymentLoader = new DeploymentLoader();
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: org.springframework.xd.dirt.server.StreamDeploymentListener.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "Stream Deployer");
            thread.setDaemon(true);
            return thread;
        }
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.xd.dirt.server.StreamDeploymentListener$2, reason: invalid class name */
    /* loaded from: input_file:org/springframework/xd/dirt/server/StreamDeploymentListener$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/server/StreamDeploymentListener$EventHandler.class */
    class EventHandler implements Callable<Void> {
        private final CuratorFramework client;
        private final PathChildrenCacheEvent event;

        EventHandler(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {
            this.client = curatorFramework;
            this.event = pathChildrenCacheEvent;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            try {
                switch (AnonymousClass2.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[this.event.getType().ordinal()]) {
                    case 1:
                        StreamDeploymentListener.this.onChildAdded(this.client, this.event.getData());
                        return null;
                    default:
                        return null;
                }
            } catch (Exception e) {
                StreamDeploymentListener.this.logger.error("Exception caught while handling event", e);
                throw e;
            }
            StreamDeploymentListener.this.logger.error("Exception caught while handling event", e);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/xd/dirt/server/StreamDeploymentListener$StreamModuleDeploymentPropertiesProvider.class */
    public class StreamModuleDeploymentPropertiesProvider implements ModuleDeploymentWriter.ContainerAwareModuleDeploymentPropertiesProvider {
        private final Map<ModuleDescriptor.Key, Integer> mapModuleCount = new HashMap();
        private final Map<ModuleDescriptor.Key, ModuleDeploymentProperties> mapDeploymentProperties = new HashMap();
        private final Stream stream;

        StreamModuleDeploymentPropertiesProvider(Stream stream) {
            this.stream = stream;
        }

        @Override // org.springframework.xd.dirt.server.ModuleDeploymentWriter.ModuleDeploymentPropertiesProvider
        public ModuleDeploymentProperties propertiesForDescriptor(ModuleDescriptor moduleDescriptor) {
            ModuleDescriptor.Key createKey = moduleDescriptor.createKey();
            ModuleDeploymentProperties moduleDeploymentProperties = this.mapDeploymentProperties.get(createKey);
            if (moduleDeploymentProperties == null) {
                moduleDeploymentProperties = DeploymentPropertiesUtility.createModuleDeploymentProperties(this.stream.getDeploymentProperties(), moduleDescriptor);
                this.mapDeploymentProperties.put(createKey, moduleDeploymentProperties);
            }
            return moduleDeploymentProperties;
        }

        @Override // org.springframework.xd.dirt.server.ModuleDeploymentWriter.ContainerAwareModuleDeploymentPropertiesProvider
        public ModuleDeploymentProperties propertiesForDescriptor(ModuleDescriptor moduleDescriptor, Container container) {
            List<ModuleDescriptor> descriptorsAsList = this.stream.getDescriptorsAsList();
            ModuleDeploymentProperties propertiesForDescriptor = propertiesForDescriptor(moduleDescriptor);
            int index = moduleDescriptor.getIndex();
            if (index > 0 && hasPartitionKeyProperty(propertiesForDescriptor(descriptorsAsList.get(index - 1)))) {
                ModuleDescriptor.Key createKey = moduleDescriptor.createKey();
                Integer num = this.mapModuleCount.get(createKey);
                if (num == null) {
                    num = 0;
                }
                Integer valueOf = Integer.valueOf(num.intValue() + 1);
                propertiesForDescriptor.put("consumer.partitionIndex", String.valueOf(num));
                this.mapModuleCount.put(createKey, valueOf);
            }
            if (hasPartitionKeyProperty(propertiesForDescriptor)) {
                try {
                    String str = propertiesForDescriptor(descriptorsAsList.get(index + 1)).get("count");
                    validateCountProperty(str, moduleDescriptor);
                    propertiesForDescriptor.put("producer.partitionCount", str);
                } catch (IndexOutOfBoundsException e) {
                    StreamDeploymentListener.this.logger.warn("Module '{}' is a sink module which contains a property of '{}' used for data partitioning; this feature is only supported for modules that produce data", moduleDescriptor, "producer.partitionKeyExpression");
                }
            }
            this.mapDeploymentProperties.put(moduleDescriptor.createKey(), propertiesForDescriptor);
            return propertiesForDescriptor;
        }

        private boolean hasPartitionKeyProperty(ModuleDeploymentProperties moduleDeploymentProperties) {
            return moduleDeploymentProperties.containsKey("producer.partitionKeyExpression") || moduleDeploymentProperties.containsKey("producer.partitionKeyExtractorClass");
        }

        private void validateCountProperty(String str, ModuleDescriptor moduleDescriptor) {
            Assert.hasText(str, String.format("'count' property is required in properties for module '%s' in order to support partitioning", moduleDescriptor));
            try {
                Assert.isTrue(Integer.parseInt(str) > 1, String.format("'count' property for module '%s' must contain an integer > 1, current value is '%s'", moduleDescriptor, str));
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException(String.format("'count' property for module %s does not contain a valid integer, current value is '%s'", moduleDescriptor, str));
            }
        }
    }

    public StreamDeploymentListener(ZooKeeperConnection zooKeeperConnection, ContainerRepository containerRepository, StreamFactory streamFactory, ContainerMatcher containerMatcher) {
        this.moduleDeploymentWriter = new ModuleDeploymentWriter(zooKeeperConnection, containerRepository, containerMatcher);
        this.streamFactory = streamFactory;
    }

    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        ZooKeeperUtils.logCacheEvent(this.logger, pathChildrenCacheEvent);
        this.executorService.submit(new EventHandler(curatorFramework, pathChildrenCacheEvent));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onChildAdded(CuratorFramework curatorFramework, ChildData childData) throws Exception {
        Stream loadStream = this.deploymentLoader.loadStream(curatorFramework, Paths.stripPath(childData.getPath()), this.streamFactory);
        if (loadStream != null) {
            this.logger.info("Deploying stream {}", loadStream);
            prepareStream(curatorFramework, loadStream);
            deployStream(loadStream);
            this.logger.info("Stream {} deployment attempt complete", loadStream);
        }
    }

    private void prepareStream(CuratorFramework curatorFramework, Stream stream) throws Exception {
        Iterator<ModuleDescriptor> deploymentOrderIterator = stream.getDeploymentOrderIterator();
        while (deploymentOrderIterator.hasNext()) {
            ModuleDescriptor next = deploymentOrderIterator.next();
            String name = stream.getName();
            String moduleType = next.getModuleDefinition().getType().toString();
            String build = new StreamDeploymentsPath().setStreamName(name).setModuleType(moduleType).setModuleLabel(next.getModuleLabel()).build();
            try {
                curatorFramework.create().creatingParentsIfNeeded().forPath(build);
            } catch (KeeperException.NodeExistsException e) {
                this.logger.info("Path {} already exists", build);
            }
        }
    }

    private void deployStream(Stream stream) throws InterruptedException {
        try {
            this.moduleDeploymentWriter.validateResults(this.moduleDeploymentWriter.writeDeployment(stream.getDeploymentOrderIterator(), new StreamModuleDeploymentPropertiesProvider(stream)));
        } catch (NoContainerException e) {
            this.logger.warn("No containers available for deployment of stream {}", stream.getName());
        }
    }
}
