package org.springframework.xd.dirt.plugins.spark.streaming;

import java.util.Iterator;
import java.util.Properties;
import org.apache.spark.storage.StorageLevel;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.plugins.AbstractStreamPlugin;
import org.springframework.xd.dirt.plugins.stream.ModuleTypeConversionSupport;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.core.Module;
import org.springframework.xd.module.spark.streaming.SparkStreamingDriverModule;
import org.springframework.xd.spark.streaming.SparkStreamingSupport;

/* loaded from: input_file:org/springframework/xd/dirt/plugins/spark/streaming/SparkStreamingPlugin.class */
public class SparkStreamingPlugin extends AbstractStreamPlugin {
    private static final String REDIS_CONNECTION_PROPERTY_PREFIX = "spring.redis";
    private static final String RABBIT_CONNECTION_PROPERTY_PREFIX = "spring.rabbitmq";
    private static final String MESSAGE_BUS_PROPERTY_PREFIX = "xd.messagebus.";

    @Autowired
    public SparkStreamingPlugin(MessageBus messageBus, ZooKeeperConnection zooKeeperConnection) {
        super(messageBus, zooKeeperConnection);
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractStreamPlugin, org.springframework.xd.dirt.plugins.AbstractPlugin
    public boolean supports(Module module) {
        return "spark".equals(module.getProperties().getProperty("moduleExecutionFramework"));
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractPlugin
    public void postProcessModule(Module module) {
        MessageBusReceiver messageBusReceiver;
        ConfigurableEnvironment environment = module.getApplicationContext().getEnvironment();
        String property = environment.getProperty("XD_TRANSPORT");
        Properties messageBusProperties = getMessageBusProperties(module);
        Properties properties = extractConsumerProducerProperties(module)[0];
        Properties properties2 = extractConsumerProducerProperties(module)[1];
        String property2 = environment.getProperty("spark.storageLevel");
        StorageLevel fromString = StorageLevel.fromString(StringUtils.hasText(property2) ? property2 : "MEMORY_ONLY");
        String property3 = module.getProperties().getProperty("storageLevel");
        StorageLevel fromString2 = StringUtils.hasText(property3) ? StorageLevel.fromString(property3) : fromString;
        if (property.equals("local")) {
            try {
                SparkStreamingSupport sparkStreamingSupport = (SparkStreamingSupport) module.getComponent(SparkStreamingSupport.class);
                Assert.notNull(sparkStreamingSupport, "Problem getting the spark streaming module. Is the module context active?");
                Properties sparkModuleProperties = SparkStreamingDriverModule.getSparkModuleProperties(sparkStreamingSupport);
                String property4 = environment.getProperty("spark.master");
                if (sparkModuleProperties != null && StringUtils.hasText(sparkModuleProperties.getProperty("spark.master"))) {
                    property4 = sparkModuleProperties.getProperty("spark.master");
                }
                Assert.notNull(property4, "Spark Master URL must be set.");
                if (!property4.startsWith("local")) {
                    throw new IllegalStateException("Spark cluster mode must be 'local' for 'local' XD transport.");
                }
                LocalMessageBusHolder localMessageBusHolder = new LocalMessageBusHolder();
                LocalMessageBusHolder.set((MessageBus) module.getComponent(MessageBus.class));
                messageBusReceiver = new MessageBusReceiver(localMessageBusHolder, fromString2, messageBusProperties, properties, ModuleTypeConversionSupport.getInputMimeType(module));
                if (module.getType().equals(ModuleType.processor)) {
                    module.getApplicationContext().getBeanFactory().registerSingleton("messageBusSender", new MessageBusSender(localMessageBusHolder, getOutputChannelName(module), messageBusProperties, properties2, ModuleTypeConversionSupport.getOutputMimeType(module)));
                }
            } catch (NoSuchBeanDefinitionException e) {
                throw new IllegalStateException("Either java or scala module should be present.");
            }
        } else {
            messageBusReceiver = new MessageBusReceiver(fromString2, messageBusProperties, properties, ModuleTypeConversionSupport.getInputMimeType(module));
            if (module.getType().equals(ModuleType.processor)) {
                module.getApplicationContext().getBeanFactory().registerSingleton("messageBusSender", new MessageBusSender(getOutputChannelName(module), messageBusProperties, properties2, ModuleTypeConversionSupport.getOutputMimeType(module)));
            }
        }
        registerMessageBusReceiver(messageBusReceiver, module);
    }

    private Properties getMessageBusProperties(Module module) {
        ConfigurableEnvironment environment = module.getApplicationContext().getEnvironment();
        Properties properties = new Properties();
        properties.put("XD_TRANSPORT", environment.getProperty("XD_TRANSPORT"));
        Iterator it = environment.getPropertySources().iterator();
        while (it.hasNext()) {
            EnumerablePropertySource enumerablePropertySource = (PropertySource) it.next();
            if (enumerablePropertySource instanceof EnumerablePropertySource) {
                for (String str : enumerablePropertySource.getPropertyNames()) {
                    if (str.startsWith(REDIS_CONNECTION_PROPERTY_PREFIX) || str.startsWith(RABBIT_CONNECTION_PROPERTY_PREFIX) || str.startsWith(MESSAGE_BUS_PROPERTY_PREFIX)) {
                        properties.put(str, environment.getProperty(str));
                    }
                }
            }
        }
        return properties;
    }

    private void registerMessageBusReceiver(MessageBusReceiver messageBusReceiver, Module module) {
        messageBusReceiver.setInputChannelName(getInputChannelName(module));
        module.getApplicationContext().getBeanFactory().registerSingleton("messageBusReceiver", messageBusReceiver);
    }
}
