package co.cask.cdap.etl.batch.mapreduce;

import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.etl.api.Aggregator;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.JoinElement;
import co.cask.cdap.etl.api.Joiner;
import co.cask.cdap.etl.api.Transformation;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.BatchJoinerRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.batch.TransformExecutorFactory;
import co.cask.cdap.etl.batch.conversion.WritableConversion;
import co.cask.cdap.etl.batch.conversion.WritableConversions;
import co.cask.cdap.etl.batch.join.Join;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.NoErrorEmitter;
import co.cask.cdap.etl.common.TrackedTransform;
import co.cask.cdap.etl.common.preview.LimitingTransform;
import co.cask.cdap.etl.planner.StageInfo;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;

/* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:co/cask/cdap/etl/batch/mapreduce/MapReduceTransformExecutorFactory.class */
public class MapReduceTransformExecutorFactory<T> extends TransformExecutorFactory<T> {
    private final Map<String, Map<String, String>> pluginRuntimeArgs;
    private final MapReduceTaskContext taskContext;
    private final String mapOutputKeyClassName;
    private final String mapOutputValClassName;
    private final int numberOfRecordsPreview;

    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:co/cask/cdap/etl/batch/mapreduce/MapReduceTransformExecutorFactory$CastConversion.class */
    private static class CastConversion<T, W extends Writable> extends WritableConversion<T, W> {
        private CastConversion() {
        }

        @Override // co.cask.cdap.etl.batch.conversion.WritableConversion
        public W toWritable(T t) {
            return (W) t;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // co.cask.cdap.etl.batch.conversion.WritableConversion
        public T fromWritable(W w) {
            return w;
        }
    }

    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:co/cask/cdap/etl/batch/mapreduce/MapReduceTransformExecutorFactory$MapperAggregatorTransformation.class */
    private static class MapperAggregatorTransformation<GROUP_KEY, GROUP_VAL, OUT_KEY extends Writable, OUT_VAL extends Writable> implements Transformation<GROUP_VAL, KeyValue<OUT_KEY, OUT_VAL>> {
        private final Aggregator<GROUP_KEY, GROUP_VAL, ?> aggregator;
        private final NoErrorEmitter<GROUP_KEY> groupKeyEmitter = new NoErrorEmitter<>("Error records cannot be emitted from the groupBy method of an aggregator");
        private final WritableConversion<GROUP_KEY, OUT_KEY> keyConversion;
        private final WritableConversion<GROUP_VAL, OUT_VAL> valConversion;

        MapperAggregatorTransformation(Aggregator<GROUP_KEY, GROUP_VAL, ?> aggregator, String str, String str2) {
            this.aggregator = aggregator;
            WritableConversion<GROUP_KEY, OUT_KEY> conversion = WritableConversions.getConversion(str);
            WritableConversion<GROUP_VAL, OUT_VAL> conversion2 = WritableConversions.getConversion(str2);
            this.keyConversion = conversion == null ? new CastConversion<>() : conversion;
            this.valConversion = conversion2 == null ? new CastConversion<>() : conversion2;
        }

        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(GROUP_VAL group_val, Emitter<KeyValue<OUT_KEY, OUT_VAL>> emitter) throws Exception {
            this.groupKeyEmitter.reset();
            this.aggregator.groupBy(group_val, this.groupKeyEmitter);
            Iterator<GROUP_KEY> it = this.groupKeyEmitter.getEntries().iterator();
            while (it.hasNext()) {
                emitter.emit(new KeyValue<>(this.keyConversion.toWritable(it.next()), this.valConversion.toWritable(group_val)));
            }
        }
    }

    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:co/cask/cdap/etl/batch/mapreduce/MapReduceTransformExecutorFactory$MapperJoinerTransformation.class */
    private static class MapperJoinerTransformation<JOIN_KEY, INPUT_RECORD, OUT, OUT_KEY extends Writable, OUT_VALUE extends Writable> implements Transformation<KeyValue<String, INPUT_RECORD>, KeyValue<OUT_KEY, TaggedWritable<OUT_VALUE>>> {
        private final Joiner<JOIN_KEY, INPUT_RECORD, OUT> joiner;
        private final WritableConversion<JOIN_KEY, OUT_KEY> keyConversion;
        private final WritableConversion<INPUT_RECORD, OUT_VALUE> inputConversion;

        MapperJoinerTransformation(Joiner<JOIN_KEY, INPUT_RECORD, OUT> joiner, String str, String str2) {
            this.joiner = joiner;
            WritableConversion<JOIN_KEY, OUT_KEY> conversion = WritableConversions.getConversion(str);
            WritableConversion<INPUT_RECORD, OUT_VALUE> conversion2 = WritableConversions.getConversion(str2);
            this.keyConversion = conversion == null ? new CastConversion<>() : conversion;
            this.inputConversion = conversion2 == null ? new CastConversion<>() : conversion2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(KeyValue<String, INPUT_RECORD> keyValue, Emitter<KeyValue<OUT_KEY, TaggedWritable<OUT_VALUE>>> emitter) throws Exception {
            String str = (String) keyValue.getKey();
            Object joinOn = this.joiner.joinOn(str, keyValue.getValue());
            emitter.emit(new KeyValue<>(this.keyConversion.toWritable(joinOn), new TaggedWritable(str, this.inputConversion.toWritable(keyValue.getValue()))));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:co/cask/cdap/etl/batch/mapreduce/MapReduceTransformExecutorFactory$ReducerAggregatorTransformation.class */
    public static class ReducerAggregatorTransformation<GROUP_KEY, GROUP_VAL, OUT, REDUCE_KEY extends WritableComparable, REDUCE_VAL extends Writable> implements Transformation<KeyValue<REDUCE_KEY, Iterator<REDUCE_VAL>>, OUT> {
        private final Aggregator<GROUP_KEY, GROUP_VAL, OUT> aggregator;
        private final WritableConversion<GROUP_KEY, REDUCE_KEY> keyConversion;
        private final WritableConversion<GROUP_VAL, REDUCE_VAL> valConversion;

        ReducerAggregatorTransformation(Aggregator<GROUP_KEY, GROUP_VAL, OUT> aggregator, String str, String str2) {
            this.aggregator = aggregator;
            WritableConversion<GROUP_KEY, REDUCE_KEY> conversion = WritableConversions.getConversion(str);
            WritableConversion<GROUP_VAL, REDUCE_VAL> conversion2 = WritableConversions.getConversion(str2);
            this.keyConversion = conversion == null ? new CastConversion<>() : conversion;
            this.valConversion = conversion2 == null ? new CastConversion<>() : conversion2;
        }

        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(KeyValue<REDUCE_KEY, Iterator<REDUCE_VAL>> keyValue, Emitter<OUT> emitter) throws Exception {
            this.aggregator.aggregate(this.keyConversion.fromWritable((Writable) keyValue.getKey()), Iterators.transform((Iterator) keyValue.getValue(), new Function<REDUCE_VAL, GROUP_VAL>() { // from class: co.cask.cdap.etl.batch.mapreduce.MapReduceTransformExecutorFactory.ReducerAggregatorTransformation.1
                @Override // com.google.common.base.Function
                @Nullable
                public GROUP_VAL apply(@Nullable REDUCE_VAL reduce_val) {
                    return (GROUP_VAL) ReducerAggregatorTransformation.this.valConversion.fromWritable(reduce_val);
                }
            }), emitter);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:co/cask/cdap/etl/batch/mapreduce/MapReduceTransformExecutorFactory$ReducerJoinerTransformation.class */
    public static class ReducerJoinerTransformation<JOIN_KEY, INPUT_RECORD, OUT, REDUCE_KEY extends WritableComparable, REDUCE_VALUE extends Writable> implements Transformation<KeyValue<REDUCE_KEY, Iterator<TaggedWritable<REDUCE_VALUE>>>, OUT> {
        private final Joiner<JOIN_KEY, INPUT_RECORD, OUT> joiner;
        private final WritableConversion<JOIN_KEY, REDUCE_KEY> keyConversion;
        private final WritableConversion<INPUT_RECORD, REDUCE_VALUE> inputConversion;
        private final int numOfInputs;

        ReducerJoinerTransformation(Joiner<JOIN_KEY, INPUT_RECORD, OUT> joiner, String str, String str2, int i) {
            this.joiner = joiner;
            WritableConversion<JOIN_KEY, REDUCE_KEY> conversion = WritableConversions.getConversion(str);
            WritableConversion<INPUT_RECORD, REDUCE_VALUE> conversion2 = WritableConversions.getConversion(str2);
            this.keyConversion = conversion == null ? new CastConversion<>() : conversion;
            this.inputConversion = conversion2 == null ? new CastConversion<>() : conversion2;
            this.numOfInputs = i;
        }

        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(KeyValue<REDUCE_KEY, Iterator<TaggedWritable<REDUCE_VALUE>>> keyValue, Emitter<OUT> emitter) throws Exception {
            new Join(this.joiner, this.keyConversion.fromWritable((Writable) keyValue.getKey()), Iterators.transform((Iterator) keyValue.getValue(), new Function<TaggedWritable<REDUCE_VALUE>, JoinElement<INPUT_RECORD>>() { // from class: co.cask.cdap.etl.batch.mapreduce.MapReduceTransformExecutorFactory.ReducerJoinerTransformation.1
                @Override // com.google.common.base.Function
                @Nullable
                public JoinElement<INPUT_RECORD> apply(@Nullable TaggedWritable<REDUCE_VALUE> taggedWritable) {
                    return new JoinElement<>(taggedWritable.getStageName(), ReducerJoinerTransformation.this.inputConversion.fromWritable(taggedWritable.getRecord()));
                }
            }), this.numOfInputs, emitter).joinRecords();
        }
    }

    public MapReduceTransformExecutorFactory(MapReduceTaskContext mapReduceTaskContext, PipelinePluginInstantiator pipelinePluginInstantiator, Metrics metrics, Map<String, Map<String, String>> map, String str, int i) {
        super((JobContext) mapReduceTaskContext.getHadoopContext(), pipelinePluginInstantiator, metrics, str, new DefaultMacroEvaluator(mapReduceTaskContext.getWorkflowToken(), mapReduceTaskContext.getRuntimeArguments(), mapReduceTaskContext.getLogicalStartTime(), mapReduceTaskContext, mapReduceTaskContext.getNamespace()));
        this.taskContext = mapReduceTaskContext;
        this.pluginRuntimeArgs = map;
        Configuration configuration = ((JobContext) mapReduceTaskContext.getHadoopContext()).getConfiguration();
        this.mapOutputKeyClassName = configuration.get("cdap.etl.map.key.class");
        this.mapOutputValClassName = configuration.get("cdap.etl.map.val.class");
        this.numberOfRecordsPreview = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.etl.batch.TransformExecutorFactory
    public MapReduceRuntimeContext createRuntimeContext(StageInfo stageInfo) {
        Map<String, String> map = this.pluginRuntimeArgs.get(stageInfo.getName());
        if (map == null) {
            map = new HashMap();
        }
        return new MapReduceRuntimeContext(this.taskContext, this.metrics, new DatasetContextLookupProvider(this.taskContext), map, stageInfo);
    }

    @Override // co.cask.cdap.etl.batch.TransformExecutorFactory
    protected TrackedTransform getTransformation(StageInfo stageInfo) throws Exception {
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(this.taskContext.getWorkflowToken(), this.taskContext.getRuntimeArguments(), this.taskContext.getLogicalStartTime(), this.taskContext, this.taskContext.getNamespace());
        String name = stageInfo.getName();
        String pluginType = stageInfo.getPluginType();
        DefaultStageMetrics defaultStageMetrics = new DefaultStageMetrics(this.metrics, name);
        if (BatchAggregator.PLUGIN_TYPE.equals(pluginType)) {
            BatchAggregator batchAggregator = (BatchAggregator) this.pluginInstantiator.newPluginInstance(name, defaultMacroEvaluator);
            batchAggregator.initialize((BatchRuntimeContext) createRuntimeContext(stageInfo));
            return this.isMapPhase ? getTrackedEmitKeyStep(new MapperAggregatorTransformation(batchAggregator, this.mapOutputKeyClassName, this.mapOutputValClassName), defaultStageMetrics, this.taskContext.getDataTracer(name)) : getTrackedAggregateStep(new ReducerAggregatorTransformation(batchAggregator, this.mapOutputKeyClassName, this.mapOutputValClassName), defaultStageMetrics, this.taskContext.getDataTracer(name));
        }
        if (!BatchJoiner.PLUGIN_TYPE.equals(pluginType)) {
            Transformation initializedTransformation = getInitializedTransformation(stageInfo);
            return new TrackedTransform(this.taskContext.getDataTracer(name).isEnabled() && BatchSource.PLUGIN_TYPE.equals(pluginType) && this.isMapPhase ? new LimitingTransform(initializedTransformation, this.numberOfRecordsPreview) : initializedTransformation, defaultStageMetrics, this.taskContext.getDataTracer(name));
        }
        BatchJoiner batchJoiner = (BatchJoiner) this.pluginInstantiator.newPluginInstance(name, defaultMacroEvaluator);
        MapReduceRuntimeContext createRuntimeContext = createRuntimeContext(stageInfo);
        batchJoiner.initialize((BatchJoinerRuntimeContext) createRuntimeContext);
        return this.isMapPhase ? getTrackedEmitKeyStep(new MapperJoinerTransformation(batchJoiner, this.mapOutputKeyClassName, this.mapOutputValClassName), defaultStageMetrics, this.taskContext.getDataTracer(name)) : getTrackedMergeStep(new ReducerJoinerTransformation(batchJoiner, this.mapOutputKeyClassName, this.mapOutputValClassName, createRuntimeContext.getInputSchemas().size()), defaultStageMetrics, this.taskContext.getDataTracer(name));
    }
}
