package org.springframework.xd.spark.streaming;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;

/* loaded from: input_file:org/springframework/xd/spark/streaming/SparkStreamingModuleExecutor.class */
public class SparkStreamingModuleExecutor implements Serializable {
    private static SparkMessageSender messageSender;
    private static final String HEADER_PREFIX = "spark.";
    private final MessageConverter converter = new DefaultSparkStreamingMessageConverter();

    /* loaded from: input_file:org/springframework/xd/spark/streaming/SparkStreamingModuleExecutor$DefaultSparkStreamingMessageConverter.class */
    private static class DefaultSparkStreamingMessageConverter implements MessageConverter, Serializable {
        private DefaultSparkStreamingMessageConverter() {
        }

        public Object fromMessage(Message<?> message, Class<?> cls) {
            throw new UnsupportedOperationException("converter only used for creating Messages");
        }

        public Message<?> toMessage(Object obj, MessageHeaders messageHeaders) {
            Class<?> cls = obj.getClass();
            if (!cls.isPrimitive() && !cls.getName().startsWith("java.")) {
                obj = obj.toString();
            }
            return MessageBuilder.withPayload(obj).build();
        }
    }

    public void execute(JavaDStream javaDStream, Processor processor, final SparkMessageSender sparkMessageSender) {
        JavaDStreamLike process = processor.process(javaDStream);
        if (process != null) {
            process.foreachRDD(new Function<JavaRDDLike, Void>() { // from class: org.springframework.xd.spark.streaming.SparkStreamingModuleExecutor.1
                public Void call(final JavaRDDLike javaRDDLike) {
                    javaRDDLike.foreachPartition(new VoidFunction<Iterator<?>>() { // from class: org.springframework.xd.spark.streaming.SparkStreamingModuleExecutor.1.1
                        public void call(Iterator<?> it) throws Exception {
                            if (it.hasNext()) {
                                if (SparkStreamingModuleExecutor.messageSender == null) {
                                    SparkMessageSender unused = SparkStreamingModuleExecutor.messageSender = sparkMessageSender;
                                }
                                try {
                                    SparkStreamingModuleExecutor.messageSender.start();
                                    if (sparkMessageSender != null && !sparkMessageSender.isRunning()) {
                                        SparkStreamingModuleExecutor.messageSender.stop();
                                        SparkMessageSender unused2 = SparkStreamingModuleExecutor.messageSender = sparkMessageSender;
                                        SparkStreamingModuleExecutor.messageSender.start();
                                    }
                                } catch (NoSuchBeanDefinitionException e) {
                                    if (sparkMessageSender != null && !sparkMessageSender.isRunning()) {
                                        SparkStreamingModuleExecutor.messageSender.stop();
                                        SparkMessageSender unused3 = SparkStreamingModuleExecutor.messageSender = sparkMessageSender;
                                        SparkStreamingModuleExecutor.messageSender.start();
                                    }
                                } catch (Throwable th) {
                                    if (sparkMessageSender != null && !sparkMessageSender.isRunning()) {
                                        SparkStreamingModuleExecutor.messageSender.stop();
                                        SparkMessageSender unused4 = SparkStreamingModuleExecutor.messageSender = sparkMessageSender;
                                        SparkStreamingModuleExecutor.messageSender.start();
                                    }
                                    throw th;
                                }
                                while (it.hasNext()) {
                                    SparkStreamingModuleExecutor.messageSender.send(SparkStreamingModuleExecutor.this.converter.toMessage(it.next(), SparkStreamingModuleExecutor.this.populateHeaders(javaRDDLike)));
                                }
                            }
                            sparkMessageSender.stop();
                        }
                    });
                    return null;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageHeaders populateHeaders(JavaRDDLike javaRDDLike) {
        HashMap hashMap = new HashMap();
        hashMap.put("spark.rdd.id", Integer.valueOf(javaRDDLike.id()));
        hashMap.put("spark.rdd.name", javaRDDLike.name());
        SparkContext context = javaRDDLike.context();
        if (context != null) {
            hashMap.put("spark.app.id", context.applicationId());
            hashMap.put("spark.app.name", context.appName());
        }
        return new MessageHeaders(hashMap);
    }
}
