package co.cask.cdap.etl.batch.mapreduce;

import co.cask.cdap.api.data.batch.InputContext;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.Destroyables;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TransformResponse;
import co.cask.cdap.etl.planner.StageInfo;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-etl-batch-3.5.4.jar:co/cask/cdap/etl/batch/mapreduce/TransformRunner.class */
public class TransformRunner<KEY, VALUE> {
    private static final Logger LOG = LoggerFactory.getLogger(TransformRunner.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).create();
    private final Set<String> transformsWithoutErrorDataset;
    private final Map<String, ErrorOutputWriter<Object, Object>> transformErrorSinkMap;
    private final TransformExecutor<KeyValue<KEY, VALUE>> transformExecutor;
    private final OutputWriter<Object, Object> outputWriter;

    public TransformRunner(MapReduceTaskContext<Object, Object> mapReduceTaskContext, Metrics metrics) throws Exception {
        JobContext jobContext = (JobContext) mapReduceTaskContext.getHadoopContext();
        Configuration configuration = jobContext.getConfiguration();
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson((String) mapReduceTaskContext.getSpecification().getProperties().get(Constants.PIPELINEID), BatchPhaseSpec.class);
        this.outputWriter = getSinkWriter(mapReduceTaskContext, batchPhaseSpec.getPhase(), configuration);
        PipelinePluginInstantiator pipelinePluginInstantiator = new PipelinePluginInstantiator(mapReduceTaskContext, batchPhaseSpec);
        Map map = (Map) GSON.fromJson(configuration.get("cdap.etl.runtime.args"), ETLMapReduce.RUNTIME_ARGS_TYPE);
        Map map2 = (Map) GSON.fromJson(configuration.get("cdap.etl.source.alias.key"), ETLMapReduce.INPUT_ALIAS_TYPE);
        InputContext inputContext = mapReduceTaskContext.getInputContext();
        String str = inputContext != null ? (String) map2.get(inputContext.getInputName()) : null;
        PipelinePhase phase = batchPhaseSpec.getPhase();
        Set<StageInfo> stagesOfType = phase.getStagesOfType(BatchAggregator.PLUGIN_TYPE, BatchJoiner.PLUGIN_TYPE);
        if (!stagesOfType.isEmpty()) {
            String name = stagesOfType.iterator().next().getName();
            phase = jobContext instanceof Mapper.Context ? phase.subsetTo(ImmutableSet.of(name)) : phase.subsetFrom(ImmutableSet.of(name));
        }
        this.transformExecutor = (TransformExecutor<KeyValue<KEY, VALUE>>) new MapReduceTransformExecutorFactory(mapReduceTaskContext, pipelinePluginInstantiator, metrics, map, str).create(phase);
        this.transformsWithoutErrorDataset = new HashSet();
        this.transformErrorSinkMap = new HashMap();
        for (StageInfo stageInfo : batchPhaseSpec.getPhase().getStagesOfType(Transform.PLUGIN_TYPE)) {
            String errorDatasetName = stageInfo.getErrorDatasetName();
            if (errorDatasetName != null) {
                this.transformErrorSinkMap.put(stageInfo.getName(), new ErrorOutputWriter<>(mapReduceTaskContext, errorDatasetName));
            }
        }
    }

    private OutputWriter<Object, Object> getSinkWriter(MapReduceTaskContext<Object, Object> mapReduceTaskContext, PipelinePhase pipelinePhase, Configuration configuration) {
        Set<StageInfo> stagesOfType = pipelinePhase.getStagesOfType(BatchAggregator.PLUGIN_TYPE, BatchJoiner.PLUGIN_TYPE);
        JobContext jobContext = (JobContext) mapReduceTaskContext.getHadoopContext();
        if (!stagesOfType.isEmpty() && (jobContext instanceof Mapper.Context)) {
            return new SingleOutputWriter(mapReduceTaskContext);
        }
        String str = configuration.get("cdap.etl.sink.outputs");
        Preconditions.checkNotNull(str, "Sink outputs not found in Hadoop conf.");
        Map<String, SinkOutput> map = (Map) GSON.fromJson(str, ETLMapReduce.SINK_OUTPUTS_TYPE);
        return hasSingleOutput(pipelinePhase.getStagesOfType(Transform.PLUGIN_TYPE), map) ? new SingleOutputWriter(mapReduceTaskContext) : new MultiOutputWriter(mapReduceTaskContext, map);
    }

    private boolean hasSingleOutput(Set<StageInfo> set, Map<String, SinkOutput> map) {
        Iterator<StageInfo> it = set.iterator();
        while (it.hasNext()) {
            if (it.next().getErrorDatasetName() != null) {
                return false;
            }
        }
        HashSet hashSet = new HashSet();
        for (SinkOutput sinkOutput : map.values()) {
            if (sinkOutput.getErrorDatasetName() != null) {
                return false;
            }
            hashSet.addAll(sinkOutput.getSinkOutputs());
        }
        return hashSet.size() == 1;
    }

    public void transform(KEY key, VALUE value) throws Exception {
        TransformResponse runOneIteration = this.transformExecutor.runOneIteration(new KeyValue<>(key, value));
        for (Map.Entry<String, Collection<Object>> entry : runOneIteration.getSinksResults().entrySet()) {
            Iterator<Object> it = entry.getValue().iterator();
            while (it.hasNext()) {
                this.outputWriter.write(entry.getKey(), (KeyValue) it.next());
            }
        }
        for (Map.Entry<String, Collection<InvalidEntry<Object>>> entry2 : runOneIteration.getMapTransformIdToErrorEmitter().entrySet()) {
            if (!this.transformsWithoutErrorDataset.contains(entry2.getKey()) && !entry2.getValue().isEmpty()) {
                if (this.transformErrorSinkMap.containsKey(entry2.getKey())) {
                    this.transformErrorSinkMap.get(entry2.getKey()).write(entry2.getValue());
                } else {
                    LOG.warn("Transform : {} has error records, but does not have a error dataset configured.", entry2.getKey());
                    this.transformsWithoutErrorDataset.add(entry2.getKey());
                }
            }
        }
        this.transformExecutor.resetEmitter();
    }

    public void destroy() {
        Destroyables.destroyQuietly(this.transformExecutor);
    }
}
