package co.cask.cdap.internal.app.store;

import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.proto.BasicThrowable;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionFailureException;
import org.apache.twill.api.RunId;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStoreTest.class */
public class AppMetadataStoreTest {
    private static DatasetFramework datasetFramework;
    private static CConfiguration cConf;
    private static TransactionExecutorFactory txExecutorFactory;
    private static final List<ProgramRunStatus> STOP_STATUSES = ImmutableList.of(ProgramRunStatus.COMPLETED, ProgramRunStatus.FAILED, ProgramRunStatus.KILLED);
    private final AtomicInteger sourceId = new AtomicInteger();

    /* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStoreTest$CountingTicker.class */
    private static class CountingTicker extends Ticker {
        private final long elementsPerMillis;
        private int numProcessed = 0;

        CountingTicker(long j) {
            this.elementsPerMillis = j;
        }

        int getNumProcessed() {
            return this.numProcessed;
        }

        public long read() {
            this.numProcessed++;
            return TimeUnit.MILLISECONDS.toNanos(this.numProcessed / this.elementsPerMillis);
        }
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        Injector injector = AppFabricTestHelper.getInjector();
        AppFabricTestHelper.ensureNamespaceExists(NamespaceId.DEFAULT);
        datasetFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        cConf = (CConfiguration) injector.getInstance(CConfiguration.class);
    }

    @Test
    @Ignore
    public void testOldRunRecordFormat() throws Exception {
        DatasetId dataset = NamespaceId.DEFAULT.dataset("testOldRunRecordFormat");
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, ImmutableMap.of(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        final AppMetadataStore appMetadataStore = new AppMetadataStore(dataset2, cConf, new AtomicBoolean(false));
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(appMetadataStore));
        final ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.values()[ProgramType.values().length - 1], "program");
        final RunId generate = RunIds.generate();
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.1
            public void apply() throws Exception {
                appMetadataStore.recordProgramStart(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, (Map) null, (Map) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                appMetadataStore.recordProgramRunningOldFormat(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
            }
        });
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.2
            public void apply() throws Exception {
                Set runningInRange = appMetadataStore.getRunningInRange(0L, Long.MAX_VALUE);
                Assert.assertEquals(1L, runningInRange.size());
                RunRecordMeta run = appMetadataStore.getRun(program, ((RunId) runningInRange.iterator().next()).getId());
                Assert.assertNotNull(run);
                Assert.assertEquals(generate.getId(), run.getPid());
            }
        });
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.3
            public void apply() throws Exception {
                appMetadataStore.recordProgramStopOldFormat(program, generate.getId(), TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramRunStatus.COMPLETED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                Map runs = appMetadataStore.getRuns(program, ProgramRunStatus.COMPLETED, 0L, Long.MAX_VALUE, Integer.MAX_VALUE, (Predicate) null);
                Assert.assertEquals(1L, runs.size());
                ProgramRunId programRunId = (ProgramRunId) runs.keySet().iterator().next();
                Assert.assertEquals(program, programRunId.getParent());
                Assert.assertEquals(generate.getId(), programRunId.getRun());
            }
        });
    }

    @Test
    public void testSmallerSourceIdRecords() throws Exception {
        DatasetId dataset = NamespaceId.DEFAULT.dataset("testRandomSourceIdRecords");
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, ImmutableMap.of(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        final AppMetadataStore appMetadataStore = new AppMetadataStore(dataset2, cConf, new AtomicBoolean(false));
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(appMetadataStore));
        TreeSet treeSet = new TreeSet();
        final ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.WORKFLOW, "program");
        final RunId generate = RunIds.generate(10000L);
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.4
            public void apply() throws Exception {
                appMetadataStore.recordProgramStart(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, ImmutableMap.of(), ImmutableMap.of(), AppFabricTestHelper.createSourceId(10000L));
                appMetadataStore.recordProgramStop(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), ProgramRunStatus.KILLED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(10L));
            }
        });
        final ArrayList arrayList = new ArrayList();
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.5
            public void apply() throws Exception {
                arrayList.add(appMetadataStore.getRun(program, generate.getId()));
            }
        });
        runScan(createExecutor, appMetadataStore, treeSet, 0L, Long.MAX_VALUE);
    }

    @Test
    public void testScanRunningInRangeWithBatch() throws Exception {
        DatasetId dataset = NamespaceId.DEFAULT.dataset("testScanRunningInRange");
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, ImmutableMap.of(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        final AppMetadataStore appMetadataStore = new AppMetadataStore(dataset2, cConf, new AtomicBoolean(false));
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(appMetadataStore));
        TreeSet treeSet = new TreeSet();
        for (int i = 0; i < 100; i++) {
            final ProgramId program = NamespaceId.DEFAULT.app("app" + i).program(ProgramType.values()[i % ProgramType.values().length], "program" + i);
            final RunId generate = RunIds.generate((i + 1) * 10000);
            treeSet.add(Long.valueOf(RunIds.getTime(generate, TimeUnit.MILLISECONDS)));
            final int i2 = i;
            createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.6
                public void apply() throws Exception {
                    appMetadataStore.recordProgramStart(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, ImmutableMap.of(), ImmutableMap.of(), AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                    appMetadataStore.recordProgramRunning(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS) + 1, (String) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                    appMetadataStore.recordProgramStop(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), (ProgramRunStatus) AppMetadataStoreTest.STOP_STATUSES.get(i2 % AppMetadataStoreTest.STOP_STATUSES.size()), (BasicThrowable) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                }
            });
        }
        runScan(createExecutor, appMetadataStore, treeSet, 0L, Long.MAX_VALUE);
        runScan(createExecutor, appMetadataStore, treeSet.subSet(300000L, 900000L), TimeUnit.MILLISECONDS.toSeconds(300000L), TimeUnit.MILLISECONDS.toSeconds(900000L));
        runScan(createExecutor, appMetadataStore, treeSet.subSet(900000L, 1010000L), TimeUnit.MILLISECONDS.toSeconds(900000L), TimeUnit.MILLISECONDS.toSeconds(1010000L));
        runScan(createExecutor, appMetadataStore, treeSet.subSet(1010000L, 2000000L), TimeUnit.MILLISECONDS.toSeconds(1010000L), TimeUnit.MILLISECONDS.toSeconds(2000000L));
        runScan(createExecutor, appMetadataStore, treeSet.subSet(310000L, 310000L), TimeUnit.MILLISECONDS.toSeconds(310000L), TimeUnit.MILLISECONDS.toSeconds(310000L));
        runScan(createExecutor, appMetadataStore, treeSet.subSet(300000L, 310000L), TimeUnit.MILLISECONDS.toSeconds(300000L), TimeUnit.MILLISECONDS.toSeconds(310000L));
        runScan(createExecutor, appMetadataStore, treeSet.subSet(1000L, 10000L), TimeUnit.MILLISECONDS.toSeconds(1000L), TimeUnit.MILLISECONDS.toSeconds(10000L));
    }

    private void runScan(TransactionExecutor transactionExecutor, final AppMetadataStore appMetadataStore, final Set<Long> set, final long j, final long j2) throws InterruptedException, TransactionFailureException {
        transactionExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.7
            public void apply() throws Exception {
                TreeSet treeSet = new TreeSet();
                List runningInRangeForStatus = appMetadataStore.getRunningInRangeForStatus("runRecordCompleted", j, j2, 25, new CountingTicker(1L));
                Iterables.addAll(treeSet, Iterables.transform(Iterables.concat(runningInRangeForStatus), new Function<RunId, Long>() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.7.1
                    public Long apply(RunId runId) {
                        return Long.valueOf(RunIds.getTime(runId, TimeUnit.MILLISECONDS));
                    }
                }));
                Assert.assertEquals(set, treeSet);
                Assert.assertEquals((r0.getNumProcessed() - (2 * r0)) / 25, Iterables.size(runningInRangeForStatus));
            }
        });
    }

    @Test
    public void testgetRuns() throws Exception {
        DatasetId dataset = NamespaceId.DEFAULT.dataset("testgetRuns");
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, ImmutableMap.of(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        final AppMetadataStore appMetadataStore = new AppMetadataStore(dataset2, cConf, new AtomicBoolean(false));
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(appMetadataStore));
        final TreeSet treeSet = new TreeSet();
        final TreeSet treeSet2 = new TreeSet();
        final HashSet hashSet = new HashSet();
        final HashSet hashSet2 = new HashSet();
        for (int i = 0; i < 100; i++) {
            final ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.FLOW, "program");
            final RunId generate = RunIds.generate((i + 1) * 10000);
            treeSet.add(generate.toString());
            final int i2 = i;
            if (i % 2 == 0) {
                treeSet2.add(generate.toString());
            }
            ProgramRunId programRunId = new ProgramRunId(program.getNamespace(), program.getApplication(), program.getType(), program.getProgram(), generate.toString());
            hashSet.add(programRunId);
            if (i % 2 == 0) {
                hashSet2.add(programRunId);
            }
            createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.8
                public void apply() throws Exception {
                    appMetadataStore.recordProgramStart(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, (Map) null, (Map) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                    appMetadataStore.recordProgramRunning(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                    appMetadataStore.recordProgramStop(program, generate.getId(), RunIds.getTime(generate, TimeUnit.SECONDS), ProgramRunStatus.values()[i2 % ProgramRunStatus.values().length], (BasicThrowable) null, AppFabricTestHelper.createSourceId(AppMetadataStoreTest.this.sourceId.incrementAndGet()));
                }
            });
        }
        createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.store.AppMetadataStoreTest.9
            public void apply() throws Exception {
                Map runs = appMetadataStore.getRuns(hashSet);
                TreeSet treeSet3 = new TreeSet();
                Iterator it = runs.entrySet().iterator();
                while (it.hasNext()) {
                    treeSet3.add(((RunRecordMeta) ((Map.Entry) it.next()).getValue()).getPid());
                }
                Assert.assertEquals(treeSet, treeSet3);
                Map runs2 = appMetadataStore.getRuns(hashSet2);
                TreeSet treeSet4 = new TreeSet();
                Iterator it2 = runs2.entrySet().iterator();
                while (it2.hasNext()) {
                    treeSet4.add(((RunRecordMeta) ((Map.Entry) it2.next()).getValue()).getPid());
                }
                Assert.assertEquals(treeSet2, treeSet4);
            }
        });
    }
}
