package co.cask.cdap.etl.spark.streaming;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.streaming.Windower;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.TrackedTransform;
import co.cask.cdap.etl.spark.SparkCollection;
import co.cask.cdap.etl.spark.SparkPairCollection;
import co.cask.cdap.etl.spark.batch.BasicSparkExecutionPluginContext;
import co.cask.cdap.etl.spark.batch.SparkBatchSinkContext;
import co.cask.cdap.etl.spark.batch.SparkBatchSinkFactory;
import co.cask.cdap.etl.spark.function.CountingFunction;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/cdap-etl-batch-3.5.4.jar:lib/hydrator-spark-core-3.5.4.jar:co/cask/cdap/etl/spark/streaming/DStreamCollection.class
 */
/* loaded from: input_file:lib/hydrator-spark-core-3.5.4.jar:co/cask/cdap/etl/spark/streaming/DStreamCollection.class */
public class DStreamCollection<T> implements SparkCollection<T> {
    private static final Logger LOG = LoggerFactory.getLogger(DStreamCollection.class);
    private final JavaSparkExecutionContext sec;
    private final JavaSparkContext sparkContext;
    private final JavaDStream<T> stream;

    public DStreamCollection(JavaSparkExecutionContext javaSparkExecutionContext, JavaSparkContext javaSparkContext, JavaDStream<T> javaDStream) {
        this.sec = javaSparkExecutionContext;
        this.sparkContext = javaSparkContext;
        this.stream = javaDStream;
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public JavaDStream<T> getUnderlying() {
        return this.stream;
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<T> cache() {
        return (SparkCollection<T>) wrap(this.stream.cache());
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<T> union(SparkCollection<T> sparkCollection) {
        return (SparkCollection<T>) wrap(this.stream.union((JavaDStream) sparkCollection.getUnderlying()));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public <U> SparkCollection<U> flatMap(FlatMapFunction<T, U> flatMapFunction) {
        return wrap(this.stream.flatMap(flatMapFunction));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public <K, V> SparkPairCollection<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> pairFlatMapFunction) {
        return new PairDStreamCollection(this.sec, this.sparkContext, this.stream.flatMapToPair(pairFlatMapFunction));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public <U> SparkCollection<U> compute(final String str, final SparkCompute<T, U> sparkCompute) throws Exception {
        this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.1
            public void run(DatasetContext datasetContext) throws Exception {
                sparkCompute.initialize(new BasicSparkExecutionPluginContext(DStreamCollection.this.sec, DStreamCollection.this.sparkContext, datasetContext, str));
            }
        });
        return wrap(this.stream.transform(new Function2<JavaRDD<T>, Time, JavaRDD<U>>() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.2
            public JavaRDD<U> call(JavaRDD<T> javaRDD, Time time) throws Exception {
                return sparkCompute.transform(new SparkStreamingExecutionContext(DStreamCollection.this.sec, DStreamCollection.this.sparkContext, str, time.milliseconds()), javaRDD.map(new CountingFunction(str, DStreamCollection.this.sec.getMetrics(), TrackedTransform.RECORDS_IN))).map(new CountingFunction(str, DStreamCollection.this.sec.getMetrics(), TrackedTransform.RECORDS_OUT));
            }
        }));
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public void store(final String str, final PairFlatMapFunction<T, Object, Object> pairFlatMapFunction) {
        this.stream.foreachRDD(new Function2<JavaRDD<T>, Time, Void>() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.3
            public Void call(JavaRDD<T> javaRDD, Time time) throws Exception {
                final long milliseconds = time.milliseconds();
                DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(DStreamCollection.this.sec.getWorkflowToken(), DStreamCollection.this.sec.getRuntimeArguments(), milliseconds, DStreamCollection.this.sec.getSecureStore(), DStreamCollection.this.sec.getNamespace());
                final SparkBatchSinkFactory sparkBatchSinkFactory = new SparkBatchSinkFactory();
                final BatchSink batchSink = (BatchSink) DStreamCollection.this.sec.getPluginContext().newPluginInstance(str, defaultMacroEvaluator);
                boolean z = false;
                boolean z2 = false;
                try {
                    try {
                        DStreamCollection.this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.3.1
                            public void run(DatasetContext datasetContext) throws Exception {
                                batchSink.prepareRun(new SparkBatchSinkContext(sparkBatchSinkFactory, DStreamCollection.this.sec, datasetContext, str, milliseconds));
                            }
                        });
                        z = true;
                        sparkBatchSinkFactory.writeFromRDD(javaRDD.map(new CountingFunction(str, DStreamCollection.this.sec.getMetrics(), TrackedTransform.RECORDS_IN)).flatMapToPair(pairFlatMapFunction), DStreamCollection.this.sec, str, Object.class, Object.class);
                        z2 = true;
                        DStreamCollection.this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.3.2
                            public void run(DatasetContext datasetContext) throws Exception {
                                batchSink.onRunFinish(true, new SparkBatchSinkContext(sparkBatchSinkFactory, DStreamCollection.this.sec, datasetContext, str, milliseconds));
                            }
                        });
                        if (1 == 0 || 1 != 0) {
                            return null;
                        }
                        DStreamCollection.this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.3.3
                            public void run(DatasetContext datasetContext) throws Exception {
                                batchSink.onRunFinish(false, new SparkBatchSinkContext(sparkBatchSinkFactory, DStreamCollection.this.sec, datasetContext, str, milliseconds));
                            }
                        });
                        return null;
                    } catch (Exception e) {
                        DStreamCollection.LOG.error("Error writing to sink {} for the batch for time {}.", str, Long.valueOf(milliseconds), e);
                        if (!z || z2) {
                            return null;
                        }
                        DStreamCollection.this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.3.3
                            public void run(DatasetContext datasetContext) throws Exception {
                                batchSink.onRunFinish(false, new SparkBatchSinkContext(sparkBatchSinkFactory, DStreamCollection.this.sec, datasetContext, str, milliseconds));
                            }
                        });
                        return null;
                    }
                } catch (Throwable th) {
                    if (z && !z2) {
                        DStreamCollection.this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.3.3
                            public void run(DatasetContext datasetContext) throws Exception {
                                batchSink.onRunFinish(false, new SparkBatchSinkContext(sparkBatchSinkFactory, DStreamCollection.this.sec, datasetContext, str, milliseconds));
                            }
                        });
                    }
                    throw th;
                }
            }
        });
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public void store(String str, SparkSink<T> sparkSink) throws Exception {
        throw new UnsupportedOperationException("Spark sink not supported in Spark Streaming.");
    }

    @Override // co.cask.cdap.etl.spark.SparkCollection
    public SparkCollection<T> window(final String str, Windower windower) {
        return (SparkCollection<T>) wrap(this.stream.transform(new Function<JavaRDD<T>, JavaRDD<T>>() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.5
            public JavaRDD<T> call(JavaRDD<T> javaRDD) throws Exception {
                return javaRDD.map(new CountingFunction(str, DStreamCollection.this.sec.getMetrics(), TrackedTransform.RECORDS_IN));
            }
        }).window(Durations.seconds(windower.getWidth()), Durations.seconds(windower.getSlideInterval())).transform(new Function<JavaRDD<T>, JavaRDD<T>>() { // from class: co.cask.cdap.etl.spark.streaming.DStreamCollection.4
            public JavaRDD<T> call(JavaRDD<T> javaRDD) throws Exception {
                return javaRDD.map(new CountingFunction(str, DStreamCollection.this.sec.getMetrics(), TrackedTransform.RECORDS_OUT));
            }
        }));
    }

    private <U> SparkCollection<U> wrap(JavaDStream<U> javaDStream) {
        return new DStreamCollection(this.sec, this.sparkContext, javaDStream);
    }
}
