package co.cask.cdap.etl.spark.streaming.function;

import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.spark.function.CountingFunction;
import co.cask.cdap.etl.spark.streaming.SparkStreamingExecutionContext;
import co.cask.cdap.etl.spec.StageSpec;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Time;

/* loaded from: input_file:lib/hydrator-spark-core2_2.11-5.0.0.jar:co/cask/cdap/etl/spark/streaming/function/ComputeTransformFunction.class */
public class ComputeTransformFunction<T, U> implements Function2<JavaRDD<T>, Time, JavaRDD<U>> {
    private final JavaSparkExecutionContext sec;
    private final StageSpec stageSpec;
    private final SparkCompute<T, U> compute;

    public ComputeTransformFunction(JavaSparkExecutionContext javaSparkExecutionContext, StageSpec stageSpec, SparkCompute<T, U> sparkCompute) {
        this.sec = javaSparkExecutionContext;
        this.stageSpec = stageSpec;
        this.compute = sparkCompute;
    }

    public JavaRDD<U> call(JavaRDD<T> javaRDD, Time time) throws Exception {
        SparkStreamingExecutionContext sparkStreamingExecutionContext = new SparkStreamingExecutionContext(this.sec, JavaSparkContext.fromSparkContext(javaRDD.context()), time.milliseconds(), this.stageSpec);
        String name = this.stageSpec.getName();
        return this.compute.transform(sparkStreamingExecutionContext, javaRDD.map(new CountingFunction(name, this.sec.getMetrics(), Constants.Metrics.RECORDS_IN, null))).map(new CountingFunction(name, this.sec.getMetrics(), Constants.Metrics.RECORDS_OUT, this.sec.getDataTracer(name)));
    }
}
