package org.springframework.xd.dirt.plugins.spark.streaming;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FilenameUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.util.Assert;
import org.springframework.util.SocketUtils;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.module.ResourceModuleRegistry;
import org.springframework.xd.dirt.plugins.AbstractStreamPlugin;
import org.springframework.xd.dirt.plugins.stream.ModuleTypeConversionSupport;
import org.springframework.xd.dirt.server.MessageBusClassLoaderFactory;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.core.Module;
import org.springframework.xd.module.core.SimpleModule;
import org.springframework.xd.spark.streaming.SparkConfig;
import org.springframework.xd.spark.streaming.SparkMessageSender;
import org.springframework.xd.spark.streaming.SparkStreamingSupport;
import org.springframework.xd.spark.streaming.java.ModuleExecutor;
import org.springframework.xd.spark.streaming.java.Processor;

/* loaded from: input_file:org/springframework/xd/dirt/plugins/spark/streaming/SparkStreamingPlugin.class */
public class SparkStreamingPlugin extends AbstractStreamPlugin {
    private static final Logger logger = LoggerFactory.getLogger(SparkStreamingPlugin.class);
    private static final String REDIS_CONNECTION_PROPERTY_PREFIX = "spring.redis";
    private static final String RABBIT_CONNECTION_PROPERTY_PREFIX = "spring.rabbitmq";
    private static final String MESSAGE_BUS_PROPERTY_PREFIX = "xd.messagebus.";
    private PathMatchingResourcePatternResolver resolver;
    private Map<Module, JavaStreamingContext> streamingContexts;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/plugins/spark/streaming/SparkStreamingPlugin$SparkStreamingListener.class */
    public static class SparkStreamingListener implements StreamingListener {
        private final CountDownLatch receiverStartLatch;
        private final AtomicBoolean receiverStartSuccess;

        private SparkStreamingListener() {
            this.receiverStartLatch = new CountDownLatch(1);
            this.receiverStartSuccess = new AtomicBoolean();
        }

        public void onReceiverStarted(StreamingListenerReceiverStarted streamingListenerReceiverStarted) {
            SparkStreamingPlugin.logger.info("Spark streaming receiver started " + streamingListenerReceiverStarted.receiverInfo());
            this.receiverStartSuccess.set(true);
            this.receiverStartLatch.countDown();
        }

        public void onReceiverError(StreamingListenerReceiverError streamingListenerReceiverError) {
            SparkStreamingPlugin.logger.info("Error starting spark streaming receiver " + streamingListenerReceiverError.receiverInfo());
            this.receiverStartSuccess.set(false);
            this.receiverStartLatch.countDown();
        }

        public void onReceiverStopped(StreamingListenerReceiverStopped streamingListenerReceiverStopped) {
            SparkStreamingPlugin.logger.info("Spark streaming receiver stopped " + streamingListenerReceiverStopped.receiverInfo());
        }

        public void onBatchSubmitted(StreamingListenerBatchSubmitted streamingListenerBatchSubmitted) {
        }

        public void onBatchStarted(StreamingListenerBatchStarted streamingListenerBatchStarted) {
        }

        public void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
        }
    }

    @Autowired
    public SparkStreamingPlugin(MessageBus messageBus) {
        super(messageBus);
        this.resolver = new PathMatchingResourcePatternResolver();
        this.streamingContexts = new HashMap();
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractStreamPlugin, org.springframework.xd.dirt.plugins.AbstractPlugin
    public boolean supports(Module module) {
        return "spark".equals(module.getProperties().getProperty("moduleExecutionFramework"));
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractPlugin
    public void postProcessModule(Module module) {
        MessageBusReceiver messageBusReceiver;
        ConfigurableEnvironment environment = module.getApplicationContext().getEnvironment();
        String property = environment.getProperty("XD_TRANSPORT");
        Properties messageBusProperties = getMessageBusProperties(module);
        Properties properties = extractConsumerProducerProperties(module)[0];
        Properties properties2 = extractConsumerProducerProperties(module)[1];
        String property2 = environment.getProperty("spark.storageLevel");
        StorageLevel fromString = StorageLevel.fromString(StringUtils.hasText(property2) ? property2 : "MEMORY_ONLY");
        String property3 = module.getProperties().getProperty("storageLevel");
        StorageLevel fromString2 = StringUtils.hasText(property3) ? StorageLevel.fromString(property3) : fromString;
        if (property.equals("local")) {
            try {
                SparkStreamingSupport sparkStreamingSupport = (SparkStreamingSupport) module.getComponent(SparkStreamingSupport.class);
                Assert.notNull(sparkStreamingSupport, "Problem getting the spark streaming module. Is the module context active?");
                Properties sparkModuleProperties = getSparkModuleProperties(sparkStreamingSupport);
                String property4 = environment.getProperty("spark.master");
                if (sparkModuleProperties != null && StringUtils.hasText(sparkModuleProperties.getProperty("spark.master"))) {
                    property4 = sparkModuleProperties.getProperty("spark.master");
                }
                Assert.notNull(property4, "Spark Master URL must be set.");
                if (!property4.startsWith("local")) {
                    throw new IllegalStateException("Spark cluster mode must be 'local' for 'local' XD transport.");
                }
                LocalMessageBusHolder localMessageBusHolder = new LocalMessageBusHolder();
                LocalMessageBusHolder.set((MessageBus) module.getComponent(MessageBus.class));
                messageBusReceiver = new MessageBusReceiver(localMessageBusHolder, fromString2, messageBusProperties, properties, ModuleTypeConversionSupport.getInputMimeType(module));
                if (module.getType().equals(ModuleType.processor)) {
                    module.getApplicationContext().getBeanFactory().registerSingleton("messageBusSender", new MessageBusSender(localMessageBusHolder, getOutputChannelName(module), buildTapChannelName(module), messageBusProperties, properties2, ModuleTypeConversionSupport.getOutputMimeType(module), module.getProperties()));
                }
            } catch (NoSuchBeanDefinitionException e) {
                throw new IllegalStateException("Either java or scala module should be present.");
            }
        } else {
            messageBusReceiver = new MessageBusReceiver(fromString2, messageBusProperties, properties, ModuleTypeConversionSupport.getInputMimeType(module));
            if (module.getType().equals(ModuleType.processor)) {
                module.getApplicationContext().getBeanFactory().registerSingleton("messageBusSender", new MessageBusSender(getOutputChannelName(module), buildTapChannelName(module), messageBusProperties, properties2, ModuleTypeConversionSupport.getOutputMimeType(module), module.getProperties()));
            }
        }
        registerMessageBusReceiver(messageBusReceiver, module);
        try {
            SparkStreamingSupport sparkStreamingSupport2 = (SparkStreamingSupport) module.getComponent(SparkStreamingSupport.class);
            Assert.notNull(sparkStreamingSupport2, "Problem getting the spark streaming module. Is the module context active?");
            startSparkStreamingContext(getSparkModuleProperties(sparkStreamingSupport2), sparkStreamingSupport2, module);
        } catch (NoSuchBeanDefinitionException e2) {
            throw new IllegalStateException("Either java or scala module should be present.");
        }
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin, org.springframework.xd.dirt.plugins.AbstractPlugin
    public void beforeShutdown(Module module) {
        super.beforeShutdown(module);
        logger.info("stopping SparkDriver");
        try {
            try {
                this.streamingContexts.get(module).stop(true, false);
            } catch (Exception e) {
                logger.warn("Error while stopping streaming context " + e);
            }
        } catch (Exception e2) {
            logger.warn("Exception when stopping the spark module " + e2);
        }
    }

    private Properties getMessageBusProperties(Module module) {
        ConfigurableEnvironment environment = module.getApplicationContext().getEnvironment();
        Properties properties = new Properties();
        properties.put("XD_TRANSPORT", environment.getProperty("XD_TRANSPORT"));
        Iterator it = environment.getPropertySources().iterator();
        while (it.hasNext()) {
            EnumerablePropertySource enumerablePropertySource = (PropertySource) it.next();
            if (enumerablePropertySource instanceof EnumerablePropertySource) {
                for (String str : enumerablePropertySource.getPropertyNames()) {
                    if (str.startsWith(REDIS_CONNECTION_PROPERTY_PREFIX) || str.startsWith(RABBIT_CONNECTION_PROPERTY_PREFIX) || str.startsWith(MESSAGE_BUS_PROPERTY_PREFIX)) {
                        properties.put(str, environment.getProperty(str));
                    }
                }
            }
        }
        return properties;
    }

    private void registerMessageBusReceiver(MessageBusReceiver messageBusReceiver, Module module) {
        messageBusReceiver.setInputChannelName(getInputChannelName(module));
        module.getApplicationContext().getBeanFactory().registerSingleton("messageBusReceiver", messageBusReceiver);
    }

    private Properties getSparkModuleProperties(SparkStreamingSupport sparkStreamingSupport) {
        Properties properties = new Properties();
        for (Method method : sparkStreamingSupport.getClass().getDeclaredMethods()) {
            if (method.getAnnotation(SparkConfig.class) != null) {
                try {
                    if (method.getReturnType().equals(Properties.class)) {
                        properties.putAll((Properties) method.invoke(sparkStreamingSupport, new Object[0]));
                    } else {
                        logger.warn("@SparkConfig annotated method should return java.util.Properties type. Ignoring the method " + method.getName());
                    }
                } catch (IllegalAccessException e) {
                } catch (InvocationTargetException e2) {
                }
            }
        }
        return properties;
    }

    private void startSparkStreamingContext(Properties properties, final SparkStreamingSupport sparkStreamingSupport, final Module module) {
        final Receiver receiver = (Receiver) module.getComponent(Receiver.class);
        Environment environment = getApplicationContext().getEnvironment();
        SparkConf sparkConf = setupSparkConf(module, environment.getProperty("spark.master", "spark://localhost:7077"), properties);
        String property = environment.getProperty("batchInterval", environment.getProperty("spark.streaming.batchInterval", "2000"));
        SparkStreamingListener sparkStreamingListener = new SparkStreamingListener();
        final SparkMessageSender sparkMessageSender = module.getType() == ModuleType.processor ? (SparkMessageSender) module.getComponent(SparkMessageSender.class) : null;
        final StreamingContext streamingContext = new StreamingContext(sparkConf, new Duration(Long.valueOf(property).longValue()));
        streamingContext.addStreamingListener(sparkStreamingListener);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.springframework.xd.dirt.plugins.spark.streaming.SparkStreamingPlugin.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(streamingContext);
                    SparkStreamingPlugin.this.streamingContexts.put(module, javaStreamingContext);
                    JavaReceiverInputDStream receiverStream = javaStreamingContext.receiverStream(receiver);
                    if (sparkStreamingSupport instanceof Processor) {
                        new ModuleExecutor().execute(receiverStream, sparkStreamingSupport, sparkMessageSender);
                    }
                    if (sparkStreamingSupport instanceof org.springframework.xd.spark.streaming.scala.Processor) {
                        new org.springframework.xd.spark.streaming.scala.ModuleExecutor().execute(receiverStream.receiverInputDStream(), sparkStreamingSupport, sparkMessageSender);
                    }
                    javaStreamingContext.start();
                    javaStreamingContext.awaitTermination();
                } catch (Exception e) {
                    throw new IllegalStateException("Exception when running Spark Streaming application.", e);
                }
            }
        });
        try {
            if (!sparkStreamingListener.receiverStartLatch.await(30L, TimeUnit.SECONDS)) {
                logger.warn("Deployment timed out when deploying Spark Streaming module " + sparkStreamingSupport);
            }
            if (sparkStreamingListener.receiverStartSuccess.get()) {
            } else {
                throw new IllegalStateException("Failed to start Spark Streaming Receiver");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private SparkConf setupSparkConf(Module module, String str, Properties properties) {
        SparkConf appName = new SparkConf().set("spark.ui.port", String.valueOf(SocketUtils.findAvailableTcpPort())).set("spark.cores.max", "3").setMaster(str).setAppName(module.getDescriptor().getGroup() + "-" + module.getDescriptor().getModuleLabel());
        if (properties != null) {
            for (String str2 : properties.stringPropertyNames()) {
                appName.set(str2, properties.getProperty(str2));
            }
        }
        ArrayList arrayList = new ArrayList();
        try {
            String str3 = appName.get("spark.jars");
            if (StringUtils.hasText(str3)) {
                arrayList.addAll(Arrays.asList(str3.split("\\s*,\\s*")));
            }
        } catch (NoSuchElementException e) {
        }
        arrayList.addAll(getApplicationJars(module));
        appName.setJars((String[]) arrayList.toArray(new String[arrayList.size()]));
        return appName;
    }

    private List<String> getApplicationJars(Module module) {
        URLClassLoader uRLClassLoader = (URLClassLoader) ((SimpleModule) module).getClassLoader();
        ArrayList arrayList = new ArrayList();
        for (URL url : uRLClassLoader.getURLs()) {
            String str = url.getFile().split("\\!", 2)[0];
            if (str.endsWith(ResourceModuleRegistry.ARCHIVE_AS_FILE_EXTENSION)) {
                arrayList.add(str);
            }
        }
        try {
            for (Resource resource : this.resolver.getResources(getApplicationContext().getEnvironment().resolvePlaceholders(MessageBusClassLoaderFactory.MESSAGE_BUS_JARS_LOCATION))) {
                arrayList.add(resource.getURL().getFile());
            }
            for (URL url2 : ((URLClassLoader) uRLClassLoader.getParent()).getURLs()) {
                String name = FilenameUtils.getName(url2.getFile());
                String str2 = url2.getFile().split("\\!", 2)[0];
                if (name.endsWith(ResourceModuleRegistry.ARCHIVE_AS_FILE_EXTENSION) && (name.contains("spark") || name.contains("spring-xd-") || name.contains("spring-core") || name.contains("spring-integration-core") || name.contains("spring-beans") || name.contains("spring-context") || name.contains("spring-boot") || name.contains("spring-aop") || name.contains("spring-expression") || name.contains("spring-messaging") || name.contains("spring-retry") || name.contains("spring-tx") || name.contains("spring-data-commons") || name.contains("spring-data-redis") || name.contains("commons-pool") || name.contains("jedis") || name.contains("kryo") || name.contains("gs-collections"))) {
                    arrayList.add(str2);
                }
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
