package org.springframework.xd.spark.streaming.java;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.xd.spark.streaming.SparkMessageSender;
import org.springframework.xd.spark.streaming.SparkStreamingModuleExecutor;

/* loaded from: input_file:org/springframework/xd/spark/streaming/java/ModuleExecutor.class */
public class ModuleExecutor implements SparkStreamingModuleExecutor<JavaReceiverInputDStream, Processor>, Serializable {
    private static SparkMessageSender messageSender;

    @Override // org.springframework.xd.spark.streaming.SparkStreamingModuleExecutor
    public void execute(JavaReceiverInputDStream javaReceiverInputDStream, Processor processor, final SparkMessageSender sparkMessageSender) {
        JavaDStreamLike process = processor.process(javaReceiverInputDStream);
        if (process != null) {
            process.foreachRDD(new Function<JavaRDDLike, Void>() { // from class: org.springframework.xd.spark.streaming.java.ModuleExecutor.1
                public Void call(JavaRDDLike javaRDDLike) {
                    javaRDDLike.foreachPartition(new VoidFunction<Iterator<?>>() { // from class: org.springframework.xd.spark.streaming.java.ModuleExecutor.1.1
                        public void call(Iterator<?> it) throws Exception {
                            if (ModuleExecutor.messageSender == null) {
                                SparkMessageSender unused = ModuleExecutor.messageSender = sparkMessageSender;
                                ModuleExecutor.messageSender.start();
                            }
                            while (it.hasNext()) {
                                Object next = it.next();
                                ModuleExecutor.messageSender.send(next instanceof Message ? (Message) next : MessageBuilder.withPayload(next).build());
                            }
                        }
                    });
                    return null;
                }
            });
            if (messageSender != null) {
                messageSender.stop();
                messageSender = null;
            }
        }
    }
}
