package co.cask.cdap.etl.batch.spark;

import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkClientContext;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchAggregatorContext;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.CompositeFinisher;
import co.cask.cdap.etl.batch.Finisher;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.planner.StageInfo;
import com.google.common.base.Joiner;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/batch/spark/ETLSpark.class */
public class ETLSpark extends AbstractSpark {
    private static final Logger LOG = LoggerFactory.getLogger(ETLSpark.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).create();
    private final BatchPhaseSpec phaseSpec;
    private Finisher finisher;
    private List<File> cleanupFiles;

    public ETLSpark(BatchPhaseSpec batchPhaseSpec) {
        this.phaseSpec = batchPhaseSpec;
    }

    protected void configure() {
        setName(this.phaseSpec.getPhaseName());
        setDescription("Spark phase executor. " + this.phaseSpec.getDescription());
        setMainClass(ETLSparkProgram.class);
        setExecutorResources(this.phaseSpec.getResources());
        setDriverResources(this.phaseSpec.getResources());
        if (this.phaseSpec.getPhase().getSources().size() != 1) {
            throw new IllegalArgumentException("Pipeline must contain exactly one source.");
        }
        if (this.phaseSpec.getPhase().getSinks().isEmpty()) {
            throw new IllegalArgumentException("Pipeline must contain at least one sink.");
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.phaseSpec));
        setProperties(hashMap);
    }

    public void beforeSubmit(SparkClientContext sparkClientContext) throws Exception {
        this.cleanupFiles = new ArrayList();
        CompositeFinisher.Builder builder = CompositeFinisher.builder();
        sparkClientContext.setSparkConf(new SparkConf().set("spark.driver.extraJavaOptions", "-XX:MaxPermSize=256m"));
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson((String) sparkClientContext.getSpecification().getProperties().get(Constants.PIPELINEID), BatchPhaseSpec.class);
        PipelinePluginInstantiator pipelinePluginInstantiator = new PipelinePluginInstantiator(sparkClientContext, batchPhaseSpec);
        String next = batchPhaseSpec.getPhase().getSources().iterator().next();
        BatchConfigurable batchConfigurable = (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(next);
        DatasetContextLookupProvider datasetContextLookupProvider = new DatasetContextLookupProvider(sparkClientContext);
        SparkBatchSourceContext sparkBatchSourceContext = new SparkBatchSourceContext(sparkClientContext, datasetContextLookupProvider, next);
        batchConfigurable.prepareRun(sparkBatchSourceContext);
        SparkBatchSourceFactory sourceFactory = sparkBatchSourceContext.getSourceFactory();
        if (sourceFactory == null) {
            throw new IllegalArgumentException("No input was set. Please make sure the source plugin calls setInput when preparing the run.");
        }
        builder.add(batchConfigurable, sparkBatchSourceContext);
        SparkBatchSinkFactory sparkBatchSinkFactory = new SparkBatchSinkFactory();
        for (String str : batchPhaseSpec.getPhase().getSinks()) {
            BatchConfigurable batchConfigurable2 = (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(str);
            if (batchConfigurable2 instanceof SparkSink) {
                BasicSparkPluginContext basicSparkPluginContext = new BasicSparkPluginContext(sparkClientContext, datasetContextLookupProvider, str);
                ((SparkSink) batchConfigurable2).prepareRun(basicSparkPluginContext);
                builder.add((SparkSink) batchConfigurable2, basicSparkPluginContext);
            } else {
                SparkBatchSinkContext sparkBatchSinkContext = new SparkBatchSinkContext(sparkBatchSinkFactory, sparkClientContext, null, str);
                batchConfigurable2.prepareRun(sparkBatchSinkContext);
                builder.add(batchConfigurable2, sparkBatchSinkContext);
            }
        }
        Set<StageInfo> stagesOfType = batchPhaseSpec.getPhase().getStagesOfType(BatchAggregator.PLUGIN_TYPE);
        Integer num = null;
        if (!stagesOfType.isEmpty()) {
            if (stagesOfType.size() > 1) {
                throw new IllegalArgumentException(String.format("There was an error during planning. Phase %s has multiple aggregators %s.", batchPhaseSpec.getPhaseName(), Joiner.on(',').join((Iterable<?>) stagesOfType)));
            }
            String name = stagesOfType.iterator().next().getName();
            BatchAggregator batchAggregator = (BatchAggregator) pipelinePluginInstantiator.newPluginInstance(name);
            SparkAggregatorContext sparkAggregatorContext = new SparkAggregatorContext(sparkClientContext, new DatasetContextLookupProvider(sparkClientContext), name);
            batchAggregator.prepareRun((BatchAggregatorContext) sparkAggregatorContext);
            builder.add(batchAggregator, sparkAggregatorContext);
            num = sparkAggregatorContext.getNumPartitions();
        }
        File createTempFile = File.createTempFile("ETLSpark", ".config");
        this.cleanupFiles.add(createTempFile);
        FileOutputStream fileOutputStream = new FileOutputStream(createTempFile);
        Throwable th = null;
        try {
            try {
                sourceFactory.serialize(fileOutputStream);
                sparkBatchSinkFactory.serialize(fileOutputStream);
                new DataOutputStream(fileOutputStream).writeInt(num == null ? -1 : num.intValue());
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                this.finisher = builder.build();
                sparkClientContext.localize("ETLSpark.config", createTempFile.toURI());
            } finally {
            }
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }

    public void onFinish(boolean z, SparkClientContext sparkClientContext) throws Exception {
        this.finisher.onFinish(z);
        for (File file : this.cleanupFiles) {
            if (!file.delete()) {
                LOG.warn("Failed to clean up resource {} ", file);
            }
        }
    }
}
