package co.cask.cdap.spark.stream;

import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.test.app.WorkflowAppWithLocalDatasets;
import com.google.common.base.Strings;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/spark/stream/TestSparkStreamIntegrationApp.class */
public class TestSparkStreamIntegrationApp extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/spark/stream/TestSparkStreamIntegrationApp$SparkStreamProgram.class */
    public static class SparkStreamProgram implements JavaSparkMain {
        static final String INPUT_STREAM_NAMESPACE = "stream.namespace";
        static final String INPUT_STREAM_NAME = "stream.name";

        public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
            new JavaSparkContext();
            Map runtimeArguments = javaSparkExecutionContext.getRuntimeArguments();
            javaSparkExecutionContext.saveAsDataset(javaSparkExecutionContext.fromStream(Strings.isNullOrEmpty((String) runtimeArguments.get(INPUT_STREAM_NAMESPACE)) ? javaSparkExecutionContext.getNamespace() : (String) runtimeArguments.get(INPUT_STREAM_NAMESPACE), Strings.isNullOrEmpty((String) runtimeArguments.get(INPUT_STREAM_NAME)) ? "testStream" : (String) runtimeArguments.get(INPUT_STREAM_NAME), String.class).mapToPair(new PairFunction<Tuple2<Long, String>, byte[], byte[]>() { // from class: co.cask.cdap.spark.stream.TestSparkStreamIntegrationApp.SparkStreamProgram.1
                public Tuple2<byte[], byte[]> call(Tuple2<Long, String> tuple2) throws Exception {
                    return new Tuple2<>(Bytes.toBytes((String) tuple2._2()), Bytes.toBytes((String) tuple2._2()));
                }
            }), WorkflowAppWithLocalDatasets.RESULT_DATASET);
        }
    }

    /* loaded from: input_file:co/cask/cdap/spark/stream/TestSparkStreamIntegrationApp$SparkStreamProgramSpec.class */
    public static class SparkStreamProgramSpec extends AbstractSpark {
        public void configure() {
            setName("SparkStreamProgram");
            setDescription("Test Spark with Streams");
            setMainClass(SparkStreamProgram.class);
        }
    }

    public void configure() {
        setName("TestSparkStreamIntegrationApp");
        setDescription("App to test Spark with Streams");
        addStream(new Stream("testStream"));
        createDataset(WorkflowAppWithLocalDatasets.RESULT_DATASET, KeyValueTable.class);
        addSpark(new SparkStreamProgramSpec());
    }
}
