package co.cask.cdap.internal.app.runtime.workflow;

import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.common.Scope;
import co.cask.cdap.api.workflow.NodeStatus;
import co.cask.cdap.api.workflow.WorkflowNodeState;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.Programs;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.app.runtime.WorkflowTokenProvider;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.lang.ProgramClassLoader;
import co.cask.cdap.internal.app.program.ForwardingProgram;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.twill.common.Threads;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/AbstractProgramWorkflowRunner.class */
public abstract class AbstractProgramWorkflowRunner implements ProgramWorkflowRunner {
    private static final Gson GSON = new Gson();
    private final CConfiguration cConf;
    private final Arguments userArguments;
    private final Arguments systemArguments;
    private final Program workflowProgram;
    private final String nodeId;
    private final Map<String, WorkflowNodeState> nodeStates;
    protected final WorkflowSpecification workflowSpec;
    protected final ProgramRunnerFactory programRunnerFactory;
    protected final WorkflowToken token;

    /* renamed from: co.cask.cdap.internal.app.runtime.workflow.AbstractProgramWorkflowRunner$5, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/AbstractProgramWorkflowRunner$5.class */
    static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State = new int[ProgramController.State.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[ProgramController.State.COMPLETED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[ProgramController.State.KILLED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[ProgramController.State.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractProgramWorkflowRunner(CConfiguration cConfiguration, Program program, ProgramOptions programOptions, ProgramRunnerFactory programRunnerFactory, WorkflowSpecification workflowSpecification, WorkflowToken workflowToken, String str, Map<String, WorkflowNodeState> map) {
        this.cConf = cConfiguration;
        this.userArguments = programOptions.getUserArguments();
        this.workflowProgram = program;
        this.programRunnerFactory = programRunnerFactory;
        this.workflowSpec = workflowSpecification;
        this.systemArguments = programOptions.getArguments();
        this.token = workflowToken;
        this.nodeId = str;
        this.nodeStates = map;
    }

    protected abstract ProgramType getProgramType();

    protected abstract Program rewriteProgram(String str, Program program);

    @Override // co.cask.cdap.internal.app.runtime.workflow.ProgramWorkflowRunner
    public final Runnable create(String str) {
        ProgramRunner create = this.programRunnerFactory.create(getProgramType());
        try {
            return getProgramRunnable(str, create, rewriteProgram(str, createProgram(create, this.workflowProgram)));
        } catch (Exception e) {
            closeProgramRunner(create);
            throw Throwables.propagate(e);
        }
    }

    private Runnable getProgramRunnable(String str, final ProgramRunner programRunner, final Program program) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.putAll(this.systemArguments.asMap());
        newHashMap.put(ProgramOptionConstants.RUN_ID, RunIds.generate().getId());
        newHashMap.put(ProgramOptionConstants.WORKFLOW_NAME, this.workflowSpec.getName());
        newHashMap.put(ProgramOptionConstants.WORKFLOW_RUN_ID, this.systemArguments.getOption(ProgramOptionConstants.RUN_ID));
        newHashMap.put(ProgramOptionConstants.WORKFLOW_NODE_ID, this.nodeId);
        newHashMap.put(ProgramOptionConstants.PROGRAM_NAME_IN_WORKFLOW, str);
        newHashMap.put(ProgramOptionConstants.WORKFLOW_TOKEN, GSON.toJson(this.token));
        final SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(program.getName(), new BasicArguments(ImmutableMap.copyOf(newHashMap)), new BasicArguments(RuntimeArguments.extractScope(Scope.scopeFor(program.getType().getCategoryName()), str, this.userArguments.asMap())));
        return new Runnable() { // from class: co.cask.cdap.internal.app.runtime.workflow.AbstractProgramWorkflowRunner.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AbstractProgramWorkflowRunner.this.runAndWait(programRunner, program, simpleProgramOptions);
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        };
    }

    private Program createProgram(ProgramRunner programRunner, Program program) throws IOException {
        ProgramClassLoader classLoader = program.getClassLoader();
        return !(classLoader instanceof ProgramClassLoader) ? new ForwardingProgram(program) { // from class: co.cask.cdap.internal.app.runtime.workflow.AbstractProgramWorkflowRunner.2
            @Override // co.cask.cdap.internal.app.program.ForwardingProgram, java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
            }
        } : Programs.create(this.cConf, programRunner, program.getJarLocation(), classLoader.getDir());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runAndWait(ProgramRunner programRunner, Program program, ProgramOptions programOptions) throws Exception {
        Closeable createCloseable = createCloseable(programRunner, program);
        try {
            ProgramController run = programRunner.run(program, programOptions);
            blockForCompletion(createCloseable, run);
            if (!(run instanceof WorkflowTokenProvider)) {
                throw new IllegalStateException("No WorkflowToken available after program completed: " + program.getId());
            }
            updateWorkflowToken(((WorkflowTokenProvider) run).getWorkflowToken());
        } catch (Throwable th) {
            Closeables.closeQuietly(createCloseable);
            throw th;
        }
    }

    private void blockForCompletion(final Closeable closeable, final ProgramController programController) throws Exception {
        final SettableFuture create = SettableFuture.create();
        programController.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.workflow.AbstractProgramWorkflowRunner.3
            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void init(ProgramController.State state, @Nullable Throwable th) {
                switch (AnonymousClass5.$SwitchMap$co$cask$cdap$app$runtime$ProgramController$State[state.ordinal()]) {
                    case 1:
                        completed();
                        return;
                    case 2:
                        killed();
                        return;
                    case 3:
                        error(th);
                        return;
                    default:
                        return;
                }
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void completed() {
                Closeables.closeQuietly(closeable);
                AbstractProgramWorkflowRunner.this.nodeStates.put(AbstractProgramWorkflowRunner.this.nodeId, new WorkflowNodeState(AbstractProgramWorkflowRunner.this.nodeId, NodeStatus.COMPLETED, programController.getRunId().getId(), (Throwable) null));
                create.set((Object) null);
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void killed() {
                Closeables.closeQuietly(closeable);
                AbstractProgramWorkflowRunner.this.nodeStates.put(AbstractProgramWorkflowRunner.this.nodeId, new WorkflowNodeState(AbstractProgramWorkflowRunner.this.nodeId, NodeStatus.KILLED, programController.getRunId().getId(), (Throwable) null));
                create.set((Object) null);
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void error(Throwable th) {
                Closeables.closeQuietly(closeable);
                AbstractProgramWorkflowRunner.this.nodeStates.put(AbstractProgramWorkflowRunner.this.nodeId, new WorkflowNodeState(AbstractProgramWorkflowRunner.this.nodeId, NodeStatus.FAILED, programController.getRunId().getId(), th));
                create.setException(th);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        try {
            create.get();
        } catch (InterruptedException e) {
            try {
                Futures.getUnchecked(programController.stop());
            } catch (Throwable th) {
            }
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (!(cause instanceof Exception)) {
                throw Throwables.propagate(cause);
            }
            throw ((Exception) cause);
        }
    }

    private void updateWorkflowToken(WorkflowToken workflowToken) throws Exception {
        ((BasicWorkflowToken) this.token).mergeToken(workflowToken);
    }

    private Closeable createCloseable(final ProgramRunner programRunner, final Program program) {
        return new Closeable() { // from class: co.cask.cdap.internal.app.runtime.workflow.AbstractProgramWorkflowRunner.4
            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                Closeables.closeQuietly(program);
                AbstractProgramWorkflowRunner.this.closeProgramRunner(programRunner);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeProgramRunner(ProgramRunner programRunner) {
        if (programRunner instanceof Closeable) {
            Closeables.closeQuietly((Closeable) programRunner);
        }
    }
}
