package co.cask.hydrator.plugin.spark.dynamic;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.spark.dynamic.CompilationFailureException;
import co.cask.cdap.api.spark.dynamic.SparkInterpreter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.StageConfigurer;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import javax.annotation.Nullable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import scala.reflect.ClassTag$;

@Name("ScalaSparkCompute")
@Description("Executes user-provided Spark code written in Scala that performs RDD to RDD transformation")
@Plugin(type = "sparkcompute")
/* loaded from: input_file:co/cask/hydrator/plugin/spark/dynamic/ScalaSparkCompute.class */
public class ScalaSparkCompute extends SparkCompute<StructuredRecord, StructuredRecord> {
    private static final String PACKAGE_NAME = "co.cask.hydrator.plugin.spark.dynamic.generated";
    private static final String CLASS_NAME = "UserSparkCompute";
    private static final String FULL_CLASS_NAME = "co.cask.hydrator.plugin.spark.dynamic.generated.UserSparkCompute";
    private final Config config;
    private SparkInterpreter interpreter;
    private Method method;
    private boolean takeContext;

    /* loaded from: input_file:co/cask/hydrator/plugin/spark/dynamic/ScalaSparkCompute$Config.class */
    public static final class Config extends PluginConfig {

        @Description("Spark code in Scala defining how to transform RDD to RDD. The code must implement a function called 'transform', which has signature as either \n  def transform(rdd: RDD[StructuredRecord]) : RDD[StructuredRecord]\n  or\n  def transform(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : RDD[StructuredRecord]\nFor example:\n'def transform(rdd: RDD[StructuredRecord]) : RDD[StructuredRecord] = {\n   rdd.filter(_.get(\"gender\") == null)\n }'\nwill filter out incoming records that does not have the 'gender' field.")
        @Macro
        private final String scalaCode;

        @Description("The schema of output objects. If no schema is given, it is assumed that the output schema is the same as the input schema.")
        @Macro
        @Nullable
        private final String schema;

        public Config(String str, @Nullable String str2) {
            this.scalaCode = str;
            this.schema = str2;
        }

        public String getScalaCode() {
            return this.scalaCode;
        }

        @Nullable
        public String getSchema() {
            return this.schema;
        }
    }

    public ScalaSparkCompute(Config config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
        SparkInterpreter createInterpreter;
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        try {
            if (!this.config.containsMacro("schema")) {
                stageConfigurer.setOutputSchema(this.config.getSchema() == null ? stageConfigurer.getInputSchema() : Schema.parseJson(this.config.getSchema()));
            }
            if (this.config.containsMacro("scalaCode") || (createInterpreter = SparkCompilers.createInterpreter()) == null) {
                return;
            }
            try {
                createInterpreter.compile(generateSourceClass());
            } catch (CompilationFailureException e) {
                throw new IllegalArgumentException(e.getMessage(), e);
            }
        } catch (IOException e2) {
            throw new IllegalArgumentException("Unable to parse output schema " + this.config.getSchema(), e2);
        }
    }

    public void initialize(SparkExecutionPluginContext sparkExecutionPluginContext) throws Exception {
        this.interpreter = sparkExecutionPluginContext.createSparkInterpreter();
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter((Writer) stringWriter, false);
        Throwable th = null;
        try {
            try {
                printWriter.println("package co.cask.hydrator.plugin.spark.dynamic.generated");
                printWriter.println("import co.cask.cdap.api.data.format._");
                printWriter.println("import co.cask.cdap.api.data.schema._");
                printWriter.println("import co.cask.cdap.etl.api.batch._");
                printWriter.println("import org.apache.spark._");
                printWriter.println("import org.apache.spark.api.java._");
                printWriter.println("import org.apache.spark.rdd._");
                printWriter.println("import org.apache.spark.sql._");
                printWriter.println("import org.apache.spark.SparkContext._");
                printWriter.println("import scala.collection.JavaConversions._");
                printWriter.println("object UserSparkCompute {");
                printWriter.println(this.config.getScalaCode());
                printWriter.println("}");
                if (printWriter != null) {
                    if (0 != 0) {
                        try {
                            printWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        printWriter.close();
                    }
                }
                this.interpreter.compile(stringWriter.toString());
                try {
                    Class<?> loadClass = this.interpreter.getClassLoader().loadClass(FULL_CLASS_NAME);
                    try {
                        this.method = loadClass.getDeclaredMethod("transform", RDD.class, SparkExecutionPluginContext.class);
                        this.takeContext = true;
                    } catch (NoSuchMethodException e) {
                        this.method = loadClass.getDeclaredMethod("transform", RDD.class);
                        this.takeContext = false;
                    }
                    Type[] genericParameterTypes = this.method.getGenericParameterTypes();
                    validateRDDType(genericParameterTypes[0], "The first parameter of the 'transform' method should have type as 'RDD[StructuredRecord]'");
                    if (this.takeContext && !SparkExecutionPluginContext.class.equals(genericParameterTypes[1])) {
                        throw new IllegalArgumentException("The second parameter of the 'transform' method should have type as SparkExecutionPluginContext");
                    }
                    validateRDDType(this.method.getGenericReturnType(), "The return type of the 'transform' method should be 'RDD[StructuredRecord]'");
                    this.method.setAccessible(true);
                } catch (NoSuchMethodException e2) {
                    throw new IllegalArgumentException("Missing a `transform` method that has signature either as 'def transform(rdd: RDD[StructuredRecord]) : RDD[StructuredRecord]' or 'def transform(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : RDD[StructuredRecord]'", e2);
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (printWriter != null) {
                if (th != null) {
                    try {
                        printWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th3;
        }
    }

    public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext sparkExecutionPluginContext, JavaRDD<StructuredRecord> javaRDD) throws Exception {
        return JavaRDD.fromRDD(this.takeContext ? (RDD) this.method.invoke(null, javaRDD.rdd(), sparkExecutionPluginContext) : (RDD) this.method.invoke(null, javaRDD.rdd()), ClassTag$.MODULE$.apply(StructuredRecord.class));
    }

    private String generateSourceClass() {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter((Writer) stringWriter, false);
        Throwable th = null;
        try {
            try {
                printWriter.println("package co.cask.hydrator.plugin.spark.dynamic.generated");
                printWriter.println("import co.cask.cdap.api.data.format._");
                printWriter.println("import co.cask.cdap.api.data.schema._");
                printWriter.println("import co.cask.cdap.etl.api.batch._");
                printWriter.println("import org.apache.spark._");
                printWriter.println("import org.apache.spark.api.java._");
                printWriter.println("import org.apache.spark.rdd._");
                printWriter.println("import org.apache.spark.SparkContext._");
                printWriter.println("object UserSparkCompute {");
                printWriter.println(this.config.getScalaCode());
                printWriter.println("}");
                if (printWriter != null) {
                    if (0 != 0) {
                        try {
                            printWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        printWriter.close();
                    }
                }
                return stringWriter.toString();
            } finally {
            }
        } catch (Throwable th3) {
            if (printWriter != null) {
                if (th != null) {
                    try {
                        printWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th3;
        }
    }

    private void validateRDDType(Type type, String str) {
        if (!(type instanceof ParameterizedType)) {
            throw new IllegalArgumentException(str);
        }
        if (!RDD.class.equals(((ParameterizedType) type).getRawType())) {
            throw new IllegalArgumentException(str);
        }
        Type[] actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
        if (actualTypeArguments.length < 1 || !actualTypeArguments[0].equals(StructuredRecord.class)) {
            throw new IllegalArgumentException(str);
        }
    }
}
