package co.cask.cdap.internal.app.runtime.workflow;

import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.batch.MapReduceProgramController;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/WorkflowMapReduceRunnerFactory.class */
public final class WorkflowMapReduceRunnerFactory implements MapReduceRunnerFactory {
    private final WorkflowSpecification workflowSpec;
    private final ProgramRunner programRunner;
    private final Program workflowProgram;
    private final RunId runId;
    private final Arguments userArguments;
    private final long logicalStartTime;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkflowMapReduceRunnerFactory(WorkflowSpecification workflowSpecification, ProgramRunner programRunner, Program program, RunId runId, Arguments arguments, long j) {
        this.workflowSpec = workflowSpecification;
        this.programRunner = programRunner;
        this.workflowProgram = program;
        this.runId = runId;
        this.logicalStartTime = j;
        this.userArguments = arguments;
    }

    @Override // co.cask.cdap.internal.app.runtime.workflow.MapReduceRunnerFactory
    public Callable<MapReduceContext> create(String str) {
        MapReduceSpecification mapReduceSpecification = (MapReduceSpecification) this.workflowSpec.getMapReduce().get(str);
        Preconditions.checkArgument(mapReduceSpecification != null, "No MapReduce with name %s found in Workflow %s", new Object[]{str, this.workflowSpec.getName()});
        final WorkflowMapReduceProgram workflowMapReduceProgram = new WorkflowMapReduceProgram(this.workflowProgram, mapReduceSpecification);
        final SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(workflowMapReduceProgram.getName(), new BasicArguments(ImmutableMap.of(ProgramOptionConstants.RUN_ID, this.runId.getId(), ProgramOptionConstants.LOGICAL_START_TIME, Long.toString(this.logicalStartTime), ProgramOptionConstants.WORKFLOW_BATCH, str)), this.userArguments);
        return new Callable<MapReduceContext>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowMapReduceRunnerFactory.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public MapReduceContext call() throws Exception {
                return WorkflowMapReduceRunnerFactory.this.runAndWait(workflowMapReduceProgram, simpleProgramOptions);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MapReduceContext runAndWait(Program program, ProgramOptions programOptions) throws Exception {
        ProgramController run = this.programRunner.run(program, programOptions);
        final MapReduceContext context = run instanceof MapReduceProgramController ? ((MapReduceProgramController) run).getContext() : null;
        final SettableFuture create = SettableFuture.create();
        run.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowMapReduceRunnerFactory.2
            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void stopped() {
                create.set(context);
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void error(Throwable th) {
                create.setException(th);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        try {
            return (MapReduceContext) create.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw ((Exception) cause);
            }
            throw Throwables.propagate(cause);
        }
    }
}
