package co.cask.cdap.spark.app;

import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.api.stream.GenericStreamEventData;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/spark/app/StreamFormatSpecSpark.class */
public class StreamFormatSpecSpark extends AbstractSpark implements JavaSparkMain {

    /* loaded from: input_file:co/cask/cdap/spark/app/StreamFormatSpecSpark$Person.class */
    public static class Person implements Serializable {
        private String name;
        private int age;

        public Person() {
        }

        public Person(String str, int i) {
            this.name = str;
            this.age = i;
        }

        public String getName() {
            return this.name;
        }

        public int getAge() {
            return this.age;
        }
    }

    protected void configure() {
        setMainClass(StreamFormatSpecSpark.class);
    }

    public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        SQLContext sQLContext = new SQLContext(new JavaSparkContext());
        sQLContext.createDataFrame(javaSparkExecutionContext.fromStream((String) javaSparkExecutionContext.getRuntimeArguments().get("stream.name"), new FormatSpecification("csv", Schema.recordOf("record", ImmutableList.of(Schema.Field.of("name", Schema.of(Schema.Type.STRING)), Schema.Field.of("age", Schema.of(Schema.Type.INT))))), StructuredRecord.class).values().map(new Function<GenericStreamEventData<StructuredRecord>, Person>() { // from class: co.cask.cdap.spark.app.StreamFormatSpecSpark.1
            public Person call(GenericStreamEventData<StructuredRecord> genericStreamEventData) throws Exception {
                StructuredRecord structuredRecord = (StructuredRecord) genericStreamEventData.getBody();
                return new Person((String) structuredRecord.get("name"), ((Integer) structuredRecord.get("age")).intValue());
            }
        }), Person.class).registerTempTable("people");
        javaSparkExecutionContext.saveAsDataset(sQLContext.sql((String) javaSparkExecutionContext.getRuntimeArguments().get("sql.statement")).toJavaRDD().mapToPair(new PairFunction<Row, String, Integer>() { // from class: co.cask.cdap.spark.app.StreamFormatSpecSpark.2
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<>(row.getString(0), Integer.valueOf(row.getInt(1)));
            }
        }), (String) javaSparkExecutionContext.getRuntimeArguments().get("output.dataset"));
    }
}
