package co.cask.cdap.etl.batch.spark;

import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.plugin.PluginContext;
import co.cask.cdap.api.spark.JavaSparkProgram;
import co.cask.cdap.api.spark.SparkContext;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.TransformContext;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.Pipeline;
import co.cask.cdap.etl.common.SinkInfo;
import co.cask.cdap.etl.common.TransformDetail;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TransformInfo;
import co.cask.cdap.etl.common.TransformResponse;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/etl/batch/spark/ETLSparkProgram.class */
public class ETLSparkProgram implements JavaSparkProgram {
    private static final Logger LOG = LoggerFactory.getLogger(ETLSparkProgram.class);
    private static final Gson GSON = new Gson();

    /* loaded from: input_file:co/cask/cdap/etl/batch/spark/ETLSparkProgram$MapFunction.class */
    public static final class MapFunction implements PairFlatMapFunction<Tuple2<Object, Object>, String, Object> {
        private final PluginContext pluginContext;
        private final Metrics metrics;
        private final long logicalStartTime;
        private final String pipelineStr;
        private final Map<String, String> runtimeArgs;
        private transient TransformExecutor<KeyValue<Object, Object>> transformExecutor;

        public MapFunction(SparkContext sparkContext) {
            this.pluginContext = sparkContext.getPluginContext();
            this.metrics = sparkContext.getMetrics();
            this.logicalStartTime = sparkContext.getLogicalStartTime();
            this.pipelineStr = sparkContext.getSpecification().getProperty(Constants.PIPELINEID);
            this.runtimeArgs = sparkContext.getRuntimeArguments();
        }

        public Iterable<Tuple2<String, Object>> call(Tuple2<Object, Object> tuple2) throws Exception {
            if (this.transformExecutor == null) {
                this.transformExecutor = initialize();
            }
            TransformResponse runOneIteration = this.transformExecutor.runOneIteration(new KeyValue<>(tuple2._1(), tuple2._2()));
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<String, Collection<Object>> entry : runOneIteration.getSinksResults().entrySet()) {
                String key = entry.getKey();
                Iterator<Object> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    arrayList.add(new Tuple2(key, it.next()));
                }
            }
            return arrayList;
        }

        private TransformExecutor<KeyValue<Object, Object>> initialize() throws Exception {
            Pipeline pipeline = (Pipeline) ETLSparkProgram.GSON.fromJson(this.pipelineStr, Pipeline.class);
            Map<String, List<String>> connections = pipeline.getConnections();
            String source = pipeline.getSource();
            BatchSource batchSource = (BatchSource) this.pluginContext.newPluginInstance(source);
            batchSource.initialize((BatchRuntimeContext) new SparkBatchRuntimeContext(this.pluginContext, this.metrics, this.logicalStartTime, this.runtimeArgs, source));
            HashMap hashMap = new HashMap();
            hashMap.put(source, new TransformDetail(batchSource, new DefaultStageMetrics(this.metrics, source), connections.get(source)));
            addTransforms(hashMap, pipeline.getTransforms(), connections);
            for (SinkInfo sinkInfo : pipeline.getSinks()) {
                String sinkId = sinkInfo.getSinkId();
                BatchSink batchSink = (BatchSink) this.pluginContext.newPluginInstance(sinkId);
                batchSink.initialize((BatchRuntimeContext) new SparkBatchRuntimeContext(this.pluginContext, this.metrics, this.logicalStartTime, this.runtimeArgs, sinkId));
                hashMap.put(sinkInfo.getSinkId(), new TransformDetail(batchSink, new DefaultStageMetrics(this.metrics, sinkInfo.getSinkId()), new ArrayList()));
            }
            return new TransformExecutor<>(hashMap, ImmutableList.of(source));
        }

        private void addTransforms(Map<String, TransformDetail> map, List<TransformInfo> list, Map<String, List<String>> map2) throws Exception {
            Iterator<TransformInfo> it = list.iterator();
            while (it.hasNext()) {
                String transformId = it.next().getTransformId();
                Transform transform = (Transform) this.pluginContext.newPluginInstance(transformId);
                SparkBatchRuntimeContext sparkBatchRuntimeContext = new SparkBatchRuntimeContext(this.pluginContext, this.metrics, this.logicalStartTime, this.runtimeArgs, transformId);
                ETLSparkProgram.LOG.debug("Transform Class : {}", transform.getClass().getName());
                transform.initialize((TransformContext) sparkBatchRuntimeContext);
                map.put(transformId, new TransformDetail(transform, new DefaultStageMetrics(this.metrics, transformId), map2.get(transformId)));
            }
        }
    }

    public void run(SparkContext sparkContext) throws Exception {
        FileInputStream fileInputStream = new FileInputStream(sparkContext.getTaskLocalizationContext().getLocalFile("ETLSpark.config"));
        Throwable th = null;
        try {
            try {
                SparkBatchSourceFactory deserialize = SparkBatchSourceFactory.deserialize(fileInputStream);
                SparkBatchSinkFactory deserialize2 = SparkBatchSinkFactory.deserialize(fileInputStream);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                JavaPairRDD cache = deserialize.createRDD(sparkContext, Object.class, Object.class).flatMapToPair(new MapFunction(sparkContext)).cache();
                Iterator<SinkInfo> it = ((Pipeline) GSON.fromJson(sparkContext.getSpecification().getProperty(Constants.PIPELINEID), Pipeline.class)).getSinks().iterator();
                while (it.hasNext()) {
                    final String sinkId = it.next().getSinkId();
                    deserialize2.writeFromRDD(cache.filter(new Function<Tuple2<String, Object>, Boolean>() { // from class: co.cask.cdap.etl.batch.spark.ETLSparkProgram.2
                        public Boolean call(Tuple2<String, Object> tuple2) throws Exception {
                            return Boolean.valueOf(((String) tuple2._1()).equals(sinkId));
                        }
                    }).flatMapToPair(new PairFlatMapFunction<Tuple2<String, Object>, Object, Object>() { // from class: co.cask.cdap.etl.batch.spark.ETLSparkProgram.1
                        public Iterable<Tuple2<Object, Object>> call(Tuple2<String, Object> tuple2) throws Exception {
                            ArrayList arrayList = new ArrayList();
                            KeyValue keyValue = (KeyValue) tuple2._2();
                            arrayList.add(new Tuple2(keyValue.getKey(), keyValue.getValue()));
                            return arrayList;
                        }
                    }), sparkContext, sinkId, Object.class, Object.class);
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }
}
