package co.cask.cdap.internal.app.services;

import co.cask.cdap.WorkflowAppWithLocalDataset;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.app.runtime.AbstractProgramRuntimeService;
import co.cask.cdap.app.runtime.NoOpProgramStateWriter;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespaceAdmin;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.app.runtime.service.SimpleRuntimeInfo;
import co.cask.cdap.internal.app.services.http.AppFabricTestBase;
import co.cask.cdap.internal.app.store.DefaultStore;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.proto.BasicThrowable;
import co.cask.cdap.proto.DatasetSpecificationSummary;
import co.cask.cdap.proto.NotRunningProgramLiveInfo;
import co.cask.cdap.proto.ProgramLiveInfo;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.RunRecord;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.WorkflowId;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.twill.api.RunId;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/internal/app/services/RunRecordCorrectorServiceTest.class */
public class RunRecordCorrectorServiceTest extends AppFabricTestBase {
    private static Store store;
    private static CConfiguration cConf;
    private static ProgramStateWriter programStateWriter;
    private static ProgramRuntimeService runtimeService;
    private static NamespaceAdmin namespaceAdmin;
    private static DatasetFramework datasetFramework;

    @BeforeClass
    public static void setup() throws Exception {
        store = (Store) getInjector().getInstance(DefaultStore.class);
        cConf = (CConfiguration) getInjector().getInstance(CConfiguration.class);
        programStateWriter = (ProgramStateWriter) getInjector().getInstance(ProgramStateWriter.class);
        runtimeService = (ProgramRuntimeService) getInjector().getInstance(ProgramRuntimeService.class);
        namespaceAdmin = (NamespaceAdmin) getInjector().getInstance(NamespaceAdmin.class);
        datasetFramework = (DatasetFramework) getInjector().getInstance(DatasetFramework.class);
    }

    @Test
    public void testFixProgram() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 10; i++) {
            ProgramRunId run = NamespaceId.DEFAULT.app("test").service("service" + i).run(RunIds.generate());
            store.setStart(run.getParent(), run.getRun(), RunIds.getTime(run.getRun(), TimeUnit.SECONDS), (String) null, Collections.emptyMap(), Collections.emptyMap(), Bytes.toBytes(atomicInteger.getAndIncrement()));
            hashMap.put(run, ProgramRunStatus.FAILED);
            ProgramRunId run2 = new NamespaceId("ns").app("test").service("worker" + i).run(RunIds.generate());
            store.setStart(run2.getParent(), run2.getRun(), RunIds.getTime(run2.getRun(), TimeUnit.SECONDS), (String) null, Collections.emptyMap(), Collections.emptyMap(), Bytes.toBytes(atomicInteger.getAndIncrement()));
            store.setRunning(run2.getParent(), run2.getRun(), System.currentTimeMillis(), (String) null, Bytes.toBytes(atomicInteger.getAndIncrement()));
            hashMap.put(run2, ProgramRunStatus.FAILED);
        }
        ProgramRunId run3 = new NamespaceId("ns").app("test").service("flow").run(RunIds.generate());
        store.setStart(run3.getParent(), run3.getRun(), RunIds.getTime(run3.getRun(), TimeUnit.SECONDS), (String) null, Collections.emptyMap(), Collections.emptyMap(), Bytes.toBytes(atomicInteger.getAndIncrement()));
        store.setRunning(run3.getParent(), run3.getRun(), System.currentTimeMillis(), (String) null, Bytes.toBytes(atomicInteger.getAndIncrement()));
        store.setSuspend(run3.getParent(), run3.getRun(), Bytes.toBytes(atomicInteger.getAndIncrement()));
        hashMap.put(run3, ProgramRunStatus.SUSPENDED);
        ProgramRunId run4 = NamespaceId.DEFAULT.app("app").mr("mr").run(RunIds.generate());
        store.setStart(run4.getParent(), run4.getRun(), RunIds.getTime(run4.getRun(), TimeUnit.SECONDS), (String) null, Collections.emptyMap(), Collections.emptyMap(), Bytes.toBytes(atomicInteger.getAndIncrement()));
        hashMap.put(run4, ProgramRunStatus.FAILED);
        ProgramRunId run5 = NamespaceId.DEFAULT.app("app").workflow("workflow").run(RunIds.generate());
        ProgramRunId run6 = run5.getParent().getParent().mr("mrInWorkflow").run(RunIds.generate());
        store.setStart(run6.getParent(), run6.getRun(), RunIds.getTime(run6.getRun(), TimeUnit.SECONDS), (String) null, Collections.emptyMap(), ImmutableMap.of("workflowName", run5.getProgram(), "workflowRunId", run5.getRun(), "workflowNodeId", "mr"), Bytes.toBytes(atomicInteger.getAndIncrement()));
        hashMap.put(run5, ProgramRunStatus.STARTING);
        store.setStart(run5.getParent(), run5.getRun(), RunIds.getTime(run5.getRun(), TimeUnit.SECONDS), (String) null, Collections.emptyMap(), Collections.emptyMap(), Bytes.toBytes(atomicInteger.getAndIncrement()));
        store.setRunning(run5.getParent(), run5.getRun(), System.currentTimeMillis(), (String) null, Bytes.toBytes(atomicInteger.getAndIncrement()));
        hashMap.put(run5, ProgramRunStatus.RUNNING);
        final HashMap hashMap2 = new HashMap();
        AbstractProgramRuntimeService abstractProgramRuntimeService = new AbstractProgramRuntimeService(cConf, null, null, null) { // from class: co.cask.cdap.internal.app.services.RunRecordCorrectorServiceTest.1
            public ProgramLiveInfo getLiveInfo(ProgramId programId) {
                return new NotRunningProgramLiveInfo(programId);
            }

            public Map<RunId, ProgramRuntimeService.RuntimeInfo> list(ProgramId programId) {
                RunId runId = (RunId) hashMap2.get(programId);
                return runId != null ? Collections.singletonMap(runId, new SimpleRuntimeInfo((ProgramController) null, programId)) : Collections.emptyMap();
            }
        };
        hashMap2.put(run3.getParent(), RunIds.fromString(run3.getRun()));
        hashMap2.put(run5.getParent(), RunIds.fromString(run5.getRun()));
        RunRecordCorrectorService runRecordCorrectorService = new RunRecordCorrectorService(cConf, store, new NoOpProgramStateWriter() { // from class: co.cask.cdap.internal.app.services.RunRecordCorrectorServiceTest.2
            public void error(ProgramRunId programRunId, Throwable th) {
                RunRecordCorrectorServiceTest.store.setStop(programRunId.getParent(), programRunId.getRun(), System.currentTimeMillis(), ProgramRunStatus.FAILED, new BasicThrowable(th), Bytes.toBytes(atomicInteger.getAndIncrement()));
            }
        }, abstractProgramRuntimeService, namespaceAdmin, datasetFramework, -1L, 5) { // from class: co.cask.cdap.internal.app.services.RunRecordCorrectorServiceTest.3
        };
        runRecordCorrectorService.fixRunRecords();
        for (Map.Entry entry : hashMap.entrySet()) {
            validateExpectedState((ProgramRunId) entry.getKey(), (ProgramRunStatus) entry.getValue());
        }
        hashMap2.remove(run5.getParent());
        store.setStop(run5.getParent(), run5.getRun(), System.currentTimeMillis(), ProgramRunStatus.COMPLETED, Bytes.toBytes(atomicInteger.getAndIncrement()));
        runRecordCorrectorService.fixRunRecords();
        hashMap.put(run5, ProgramRunStatus.COMPLETED);
        hashMap.put(run6, ProgramRunStatus.FAILED);
        for (Map.Entry entry2 : hashMap.entrySet()) {
            validateExpectedState((ProgramRunId) entry2.getKey(), (ProgramRunStatus) entry2.getValue());
        }
    }

    private void validateExpectedState(ProgramRunId programRunId, ProgramRunStatus programRunStatus) {
        RunRecordMeta run = store.getRun(programRunId.getParent(), programRunId.getRun());
        Assert.assertNotNull(run);
        Assert.assertEquals(programRunStatus, run.getStatus());
    }

    @Test
    public void testLocalDatasetDeleteion() throws Exception {
        Assert.assertEquals(200L, deploy(WorkflowAppWithLocalDataset.class, "v3", "testnamespace1").getStatusLine().getStatusCode());
        final WorkflowId workflow = new NamespaceId("testnamespace1").app(WorkflowAppWithLocalDataset.APP_NAME).workflow(WorkflowAppWithLocalDataset.WORKFLOW_NAME);
        startProgram(workflow.toId(), (Map<String, String>) ImmutableMap.of("dataset.*.keep.local", "true"));
        Tasks.waitFor(1, new Callable<Integer>() { // from class: co.cask.cdap.internal.app.services.RunRecordCorrectorServiceTest.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(RunRecordCorrectorServiceTest.this.getProgramRuns(workflow.toId(), ProgramRunStatus.COMPLETED).size());
            }
        }, 5L, TimeUnit.SECONDS);
        List<RunRecord> programRuns = getProgramRuns(workflow.toId(), ProgramRunStatus.COMPLETED);
        Assert.assertEquals(1L, programRuns.size());
        String pid = programRuns.get(0).getPid();
        final ImmutableMap of = ImmutableMap.of("workflow.local.dataset", "true");
        Collection instances = datasetFramework.getInstances(new NamespaceId("testnamespace1"), of);
        Assert.assertEquals(1L, instances.size());
        DatasetSpecificationSummary datasetSpecificationSummary = (DatasetSpecificationSummary) instances.iterator().next();
        Assert.assertTrue(datasetSpecificationSummary.getName().endsWith(pid));
        HashMap hashMap = new HashMap();
        hashMap.putAll(datasetSpecificationSummary.getProperties());
        hashMap.remove("workflow.keep.local");
        datasetFramework.updateInstance(new DatasetId("testnamespace1", datasetSpecificationSummary.getName()), DatasetProperties.of(hashMap));
        CConfiguration create = CConfiguration.create();
        create.set("app.program.local.dataset.deleter.interval", "1");
        create.set("app.program.local.dataset.deleter.initial.delay", "1");
        new LocalRunRecordCorrectorService(create, store, programStateWriter, runtimeService, namespaceAdmin, datasetFramework).startUp();
        Tasks.waitFor(0, new Callable<Integer>() { // from class: co.cask.cdap.internal.app.services.RunRecordCorrectorServiceTest.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(RunRecordCorrectorServiceTest.datasetFramework.getInstances(new NamespaceId("testnamespace1"), of).size());
            }
        }, 30L, TimeUnit.SECONDS, 1L, TimeUnit.SECONDS);
    }
}
