package co.cask.cdap.metadata;

import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.entity.EntityExistenceVerifier;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.lineage.Lineage;
import co.cask.cdap.data2.metadata.lineage.LineageStore;
import co.cask.cdap.data2.metadata.lineage.Relation;
import co.cask.cdap.data2.metadata.store.MetadataStore;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.app.services.http.AppFabricTestBase;
import co.cask.cdap.metadata.LineageAdmin;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.FlowletId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.metadata.MetadataRecord;
import co.cask.cdap.proto.metadata.MetadataScope;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.tephra.TransactionExecutorFactory;
import org.apache.twill.api.RunId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/metadata/LineageAdminTest.class */
public class LineageAdminTest extends AppFabricTestBase {
    private final StreamId stream1 = new StreamId("default", "stream1");
    private final DatasetId dataset1 = new DatasetId("default", "dataset1");
    private final DatasetId dataset2 = new DatasetId("default", "dataset2");
    private final DatasetId dataset3 = new DatasetId("default", "dataset3");
    private final DatasetId dataset4 = new DatasetId("default", "dataset4");
    private final DatasetId dataset5 = new DatasetId("default", "dataset5");
    private final DatasetId dataset6 = new DatasetId("default", "dataset6");
    private final DatasetId dataset7 = new DatasetId("default", "dataset7");
    private final ProgramId program1 = new ProgramId("default", "app1", ProgramType.FLOW, "flow1");
    private final FlowletId flowlet1 = this.program1.flowlet("flowlet1");
    private final ProgramRunId run1 = this.program1.run(RunIds.generate(10000).getId());
    private final ProgramId program2 = new ProgramId("default", "app2", ProgramType.FLOW, "flow2");
    private final FlowletId flowlet2 = this.program2.flowlet("flowlet2");
    private final ProgramRunId run2 = this.program2.run(RunIds.generate(900).getId());
    private final ProgramId program3 = new ProgramId("default", "app3", ProgramType.WORKER, "worker3");
    private final ProgramRunId run3 = this.program3.run(RunIds.generate(800).getId());
    private final ProgramId program4 = new ProgramId("default", "app4", ProgramType.SERVICE, "service4");
    private final ProgramRunId run4 = this.program4.run(RunIds.generate(800).getId());
    private final ProgramId program5 = new ProgramId("default", "app5", ProgramType.SERVICE, "service5");
    private final ProgramRunId run5 = this.program5.run(RunIds.generate(700).getId());
    private final ProgramId program6 = new ProgramId("default", "app6", ProgramType.WORKFLOW, "workflow6");
    private final ProgramRunId run6 = this.program6.run(RunIds.generate(700).getId());
    private int sourceId;

    /* loaded from: input_file:co/cask/cdap/metadata/LineageAdminTest$NoOpEntityExistenceVerifier.class */
    private static final class NoOpEntityExistenceVerifier implements EntityExistenceVerifier {
        private NoOpEntityExistenceVerifier() {
        }

        public void ensureExists(EntityId entityId) throws NotFoundException {
        }
    }

    @After
    public void cleanup() throws Exception {
        deleteNamespace(NamespaceId.DEFAULT.getNamespace());
    }

    @Test
    public void testSimpleLineage() throws Exception {
        LineageStore lineageStore = new LineageStore(getTxExecFactory(), getDatasetFramework(), NamespaceId.DEFAULT.dataset("testSimpleLineage"));
        Store store = (Store) getInjector().getInstance(Store.class);
        MetadataStore metadataStore = (MetadataStore) getInjector().getInstance(MetadataStore.class);
        LineageAdmin lineageAdmin = new LineageAdmin(lineageStore, store, metadataStore, new NoOpEntityExistenceVerifier());
        MetadataRecord metadataRecord = new MetadataRecord(this.program1.getParent(), MetadataScope.USER, toMap("pk1", "pk1"), toSet("pt1"));
        MetadataRecord metadataRecord2 = new MetadataRecord(this.program1, MetadataScope.USER, toMap("pk1", "pk1"), toSet("pt1"));
        MetadataRecord metadataRecord3 = new MetadataRecord(this.dataset1, MetadataScope.USER, toMap("dk1", "dk1"), toSet("dt1"));
        MetadataRecord metadataRecord4 = new MetadataRecord(this.dataset2, MetadataScope.USER, toMap("dk2", "dk2"), toSet("dt2"));
        metadataStore.setProperties(MetadataScope.USER, this.program1.getParent(), metadataRecord.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.program1.getParent(), (String[]) metadataRecord.getTags().toArray(new String[0]));
        metadataStore.setProperties(MetadataScope.USER, this.program1, metadataRecord2.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.program1, (String[]) metadataRecord2.getTags().toArray(new String[0]));
        metadataStore.setProperties(MetadataScope.USER, this.dataset1, metadataRecord3.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.dataset1, (String[]) metadataRecord3.getTags().toArray(new String[0]));
        metadataStore.setProperties(MetadataScope.USER, this.dataset2, metadataRecord4.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.dataset2, (String[]) metadataRecord4.getTags().toArray(new String[0]));
        ProgramRunId run = this.program1.run(RunIds.generate(System.currentTimeMillis()).getId());
        ProgramRunId run2 = this.program2.run(RunIds.generate(System.currentTimeMillis()).getId());
        ProgramRunId run3 = this.program3.run(RunIds.generate(System.currentTimeMillis()).getId());
        addRuns(store, run, run2, run3);
        lineageStore.addAccess(run, this.dataset1, AccessType.UNKNOWN, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(run, this.dataset1, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(run, this.dataset2, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(run2, this.dataset2, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(run2, this.dataset3, AccessType.READ, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(run3, this.dataset1, AccessType.UNKNOWN, System.currentTimeMillis());
        Lineage lineage = new Lineage(ImmutableSet.of(new Relation(this.dataset1, this.program1, AccessType.WRITE, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset2, this.program1, AccessType.READ, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset2, this.program2, AccessType.WRITE, twillRunId(run2), toSet(this.flowlet2)), new Relation(this.dataset3, this.program2, AccessType.READ, twillRunId(run2), toSet(this.flowlet2)), new Relation(this.dataset1, this.program3, AccessType.UNKNOWN, twillRunId(run3))));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset1, 500L, System.currentTimeMillis() + 10000, 100));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset2, 500L, System.currentTimeMillis() + 10000, 100));
        Assert.assertEquals(ImmutableSet.of(new Relation(this.dataset1, this.program1, AccessType.WRITE, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset2, this.program1, AccessType.READ, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset1, this.program3, AccessType.UNKNOWN, twillRunId(run3))), lineageAdmin.computeLineage(this.dataset1, 500L, System.currentTimeMillis() + 10000, 1).getRelations());
        Assert.assertEquals(toSet(metadataRecord, metadataRecord2, metadataRecord3, metadataRecord4), lineageAdmin.getMetadataForRun(run));
        NamespaceId namespaceId = new NamespaceId("custom_namespace");
        DatasetId dataset = namespaceId.dataset(this.dataset1.getEntityName());
        ProgramRunId run4 = namespaceId.app(this.program1.getApplication()).program(this.program1.getType(), this.program1.getEntityName()).run(run.getEntityName());
        Assert.assertEquals(new Lineage(ImmutableSet.of()), lineageAdmin.computeLineage(dataset, 500L, System.currentTimeMillis() + 10000, 100));
        Assert.assertEquals(ImmutableSet.of(), lineageAdmin.getMetadataForRun(run4));
    }

    @Test
    public void testSimpleLoopLineage() throws Exception {
        LineageStore lineageStore = new LineageStore(getTxExecFactory(), getDatasetFramework(), NamespaceId.DEFAULT.dataset("testSimpleLoopLineage"));
        Store store = (Store) getInjector().getInstance(Store.class);
        LineageAdmin lineageAdmin = new LineageAdmin(lineageStore, store, (MetadataStore) getInjector().getInstance(MetadataStore.class), new NoOpEntityExistenceVerifier());
        addRuns(store, this.run1, this.run2, this.run3, this.run4, this.run5);
        lineageStore.addAccess(this.run1, this.dataset1, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset2, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run2, this.dataset2, AccessType.READ, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run2, this.dataset1, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run2, this.dataset3, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run3, this.dataset3, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run3, this.dataset4, AccessType.WRITE, System.currentTimeMillis());
        Lineage lineage = new Lineage(ImmutableSet.of(new Relation(this.dataset2, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset2, this.program2, AccessType.READ, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset3, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset4, this.program3, AccessType.WRITE, twillRunId(this.run3), emptySet()), new Relation[]{new Relation(this.dataset3, this.program3, AccessType.READ, twillRunId(this.run3), emptySet())}));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset1, 500L, 20000L, 100));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset2, 500L, 20000L, 100));
        Assert.assertEquals(ImmutableSet.of(new Relation(this.dataset2, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset2, this.program2, AccessType.READ, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset3, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2))), lineageAdmin.computeLineage(this.dataset1, 500L, 20000L, 1).getRelations());
    }

    @Test
    public void testDirectCycle() throws Exception {
        LineageStore lineageStore = new LineageStore(getTxExecFactory(), getDatasetFramework(), NamespaceId.DEFAULT.dataset("testDirectCycle"));
        Store store = (Store) getInjector().getInstance(Store.class);
        LineageAdmin lineageAdmin = new LineageAdmin(lineageStore, store, (MetadataStore) getInjector().getInstance(MetadataStore.class), new NoOpEntityExistenceVerifier());
        addRuns(store, this.run1, this.run2, this.run3, this.run4, this.run5);
        lineageStore.addAccess(this.run1, this.dataset1, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset1, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        Assert.assertEquals(new Lineage(ImmutableSet.of(new Relation(this.dataset1, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)))), lineageAdmin.computeLineage(this.dataset1, 500L, 20000L, 100));
    }

    @Test
    public void testDirectCycleTwoRuns() throws Exception {
        LineageStore lineageStore = new LineageStore(getTxExecFactory(), getDatasetFramework(), NamespaceId.DEFAULT.dataset("testDirectCycleTwoRuns"));
        Store store = (Store) getInjector().getInstance(Store.class);
        LineageAdmin lineageAdmin = new LineageAdmin(lineageStore, store, (MetadataStore) getInjector().getInstance(MetadataStore.class), new NoOpEntityExistenceVerifier());
        addRuns(store, this.run1, this.run2, this.run3, this.run4, this.run5);
        lineageStore.addAccess(this.run1, this.dataset1, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(new ProgramRunId(this.run1.getNamespace(), this.run1.getApplication(), this.run1.getParent().getType(), this.run1.getProgram(), this.run2.getEntityName()), this.dataset1, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        Assert.assertEquals(new Lineage(ImmutableSet.of(new Relation(this.dataset1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program1, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet1)))), lineageAdmin.computeLineage(this.dataset1, 500L, 20000L, 100));
    }

    @Test
    public void testBranchLineage() throws Exception {
        LineageStore lineageStore = new LineageStore(getTxExecFactory(), getDatasetFramework(), NamespaceId.DEFAULT.dataset("testBranchLineage"));
        Store store = (Store) getInjector().getInstance(Store.class);
        LineageAdmin lineageAdmin = new LineageAdmin(lineageStore, store, (MetadataStore) getInjector().getInstance(MetadataStore.class), new NoOpEntityExistenceVerifier());
        addRuns(store, this.run1, this.run2, this.run3, this.run4, this.run5);
        lineageStore.addAccess(this.run1, this.stream1, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset1, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset2, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset4, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run2, this.dataset2, AccessType.READ, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run2, this.dataset3, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run2, this.dataset5, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run3, this.dataset5, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run3, this.dataset6, AccessType.WRITE, System.currentTimeMillis());
        lineageStore.addAccess(this.run4, this.dataset2, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run4, this.dataset3, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run4, this.dataset7, AccessType.WRITE, System.currentTimeMillis());
        Lineage lineage = new Lineage(ImmutableSet.of(new Relation(this.stream1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset2, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset4, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset2, this.program2, AccessType.READ, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset3, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation[]{new Relation(this.dataset5, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset5, this.program3, AccessType.READ, twillRunId(this.run3), emptySet()), new Relation(this.dataset6, this.program3, AccessType.WRITE, twillRunId(this.run3), emptySet()), new Relation(this.dataset2, this.program4, AccessType.READ, twillRunId(this.run4), emptySet()), new Relation(this.dataset3, this.program4, AccessType.READ, twillRunId(this.run4), emptySet()), new Relation(this.dataset7, this.program4, AccessType.WRITE, twillRunId(this.run4), emptySet())}));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset7, 500L, 20000L, 100));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset6, 500L, 20000L, 100));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset3, 500L, 20000L, 100));
    }

    @Test
    public void testBranchLoopLineage() throws Exception {
        LineageStore lineageStore = new LineageStore(getTxExecFactory(), getDatasetFramework(), NamespaceId.DEFAULT.dataset("testBranchLoopLineage"));
        Store store = (Store) getInjector().getInstance(Store.class);
        LineageAdmin lineageAdmin = new LineageAdmin(lineageStore, store, (MetadataStore) getInjector().getInstance(MetadataStore.class), new NoOpEntityExistenceVerifier());
        addRuns(store, this.run1, this.run2, this.run3, this.run4, this.run5);
        lineageStore.addAccess(this.run1, this.stream1, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset1, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset2, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run1, this.dataset4, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(this.run2, this.dataset2, AccessType.READ, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run2, this.dataset3, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run2, this.dataset5, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(this.run3, this.dataset5, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run3, this.dataset6, AccessType.WRITE, System.currentTimeMillis());
        lineageStore.addAccess(this.run4, this.dataset2, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run4, this.dataset3, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run4, this.dataset7, AccessType.WRITE, System.currentTimeMillis());
        lineageStore.addAccess(this.run5, this.dataset3, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run5, this.dataset6, AccessType.READ, System.currentTimeMillis());
        lineageStore.addAccess(this.run5, this.dataset1, AccessType.WRITE, System.currentTimeMillis());
        Lineage lineage = new Lineage(ImmutableSet.of(new Relation(this.stream1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset2, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset4, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset2, this.program2, AccessType.READ, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset3, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation[]{new Relation(this.dataset5, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset5, this.program3, AccessType.READ, twillRunId(this.run3), emptySet()), new Relation(this.dataset6, this.program3, AccessType.WRITE, twillRunId(this.run3), emptySet()), new Relation(this.dataset2, this.program4, AccessType.READ, twillRunId(this.run4), emptySet()), new Relation(this.dataset3, this.program4, AccessType.READ, twillRunId(this.run4), emptySet()), new Relation(this.dataset7, this.program4, AccessType.WRITE, twillRunId(this.run4), emptySet()), new Relation(this.dataset3, this.program5, AccessType.READ, twillRunId(this.run5), emptySet()), new Relation(this.dataset6, this.program5, AccessType.READ, twillRunId(this.run5), emptySet()), new Relation(this.dataset1, this.program5, AccessType.WRITE, twillRunId(this.run5), emptySet())}));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset1, 500L, 20000L, 100));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset5, 500L, 20000L, 100));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset7, 500L, 20000L, 100));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.stream1, 500L, 20000L, 100));
        Assert.assertEquals(ImmutableSet.of(new Relation(this.dataset2, this.program2, AccessType.READ, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset3, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset5, this.program2, AccessType.WRITE, twillRunId(this.run2), toSet(this.flowlet2)), new Relation(this.dataset5, this.program3, AccessType.READ, twillRunId(this.run3), emptySet()), new Relation(this.dataset6, this.program3, AccessType.WRITE, twillRunId(this.run3), emptySet())), lineageAdmin.computeLineage(this.dataset5, 500L, 20000L, 1).getRelations());
        Assert.assertEquals(ImmutableSet.of(new Relation(this.stream1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset1, this.program1, AccessType.READ, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset2, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1)), new Relation(this.dataset4, this.program1, AccessType.WRITE, twillRunId(this.run1), toSet(this.flowlet1))), lineageAdmin.computeLineage(this.stream1, 500L, 20000L, 1).getRelations());
    }

    @Test
    public void testWorkflowLineage() throws Exception {
        LineageStore lineageStore = new LineageStore(getTxExecFactory(), getDatasetFramework(), NamespaceId.DEFAULT.dataset("testWorkflowLineage"));
        Store store = (Store) getInjector().getInstance(Store.class);
        MetadataStore metadataStore = (MetadataStore) getInjector().getInstance(MetadataStore.class);
        LineageAdmin lineageAdmin = new LineageAdmin(lineageStore, store, metadataStore, new NoOpEntityExistenceVerifier());
        MetadataRecord metadataRecord = new MetadataRecord(this.program1.getParent(), MetadataScope.USER, toMap("pk1", "pk1"), toSet("pt1"));
        MetadataRecord metadataRecord2 = new MetadataRecord(this.program1, MetadataScope.USER, toMap("pk1", "pk1"), toSet("pt1"));
        MetadataRecord metadataRecord3 = new MetadataRecord(this.dataset1, MetadataScope.USER, toMap("dk1", "dk1"), toSet("dt1"));
        MetadataRecord metadataRecord4 = new MetadataRecord(this.dataset2, MetadataScope.USER, toMap("dk2", "dk2"), toSet("dt2"));
        metadataStore.setProperties(MetadataScope.USER, this.program1.getParent(), metadataRecord.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.program1.getParent(), (String[]) metadataRecord.getTags().toArray(new String[0]));
        metadataStore.setProperties(MetadataScope.USER, this.program1, metadataRecord2.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.program1, (String[]) metadataRecord2.getTags().toArray(new String[0]));
        metadataStore.setProperties(MetadataScope.USER, this.dataset1, metadataRecord3.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.dataset1, (String[]) metadataRecord3.getTags().toArray(new String[0]));
        metadataStore.setProperties(MetadataScope.USER, this.dataset2, metadataRecord4.getProperties());
        metadataStore.addTags(MetadataScope.USER, this.dataset2, (String[]) metadataRecord4.getTags().toArray(new String[0]));
        ProgramRunId run = this.program1.run(RunIds.generate(System.currentTimeMillis()).getId());
        ProgramRunId run2 = this.program2.run(RunIds.generate(System.currentTimeMillis()).getId());
        ProgramRunId run3 = this.program3.run(RunIds.generate(System.currentTimeMillis()).getId());
        ProgramRunId run4 = this.program6.run(RunIds.generate(System.currentTimeMillis()).getId());
        ProgramRunId run5 = this.program5.run(RunIds.generate(System.currentTimeMillis()).getId());
        addWorkflowRuns(store, run4.getProgram(), run4.getRun(), run, run2, run3);
        addRuns(store, run4);
        addRuns(store, run5);
        lineageStore.addAccess(run, this.dataset1, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(run, this.dataset1, AccessType.WRITE, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(run, this.dataset2, AccessType.READ, System.currentTimeMillis(), this.flowlet1);
        lineageStore.addAccess(run2, this.dataset2, AccessType.WRITE, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(run2, this.dataset3, AccessType.READ, System.currentTimeMillis(), this.flowlet2);
        lineageStore.addAccess(run3, this.dataset1, AccessType.UNKNOWN, System.currentTimeMillis());
        lineageStore.addAccess(run5, this.dataset1, AccessType.READ, System.currentTimeMillis());
        Lineage lineage = new Lineage(ImmutableSet.of(new Relation(this.dataset1, this.program6, AccessType.WRITE, twillRunId(run4)), new Relation(this.dataset2, this.program6, AccessType.READ, twillRunId(run4)), new Relation(this.dataset2, this.program6, AccessType.WRITE, twillRunId(run4)), new Relation(this.dataset3, this.program6, AccessType.READ, twillRunId(run4)), new Relation(this.dataset1, this.program6, AccessType.UNKNOWN, twillRunId(run4)), new Relation(this.dataset1, this.program5, AccessType.READ, twillRunId(run5)), new Relation[0]));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset1, 500L, System.currentTimeMillis() + 10000, 100, "workflow"));
        Assert.assertEquals(lineage, lineageAdmin.computeLineage(this.dataset2, 500L, System.currentTimeMillis() + 10000, 100, "workflow"));
        Assert.assertEquals(ImmutableSet.of(new Relation(this.dataset1, this.program6, AccessType.WRITE, twillRunId(run4)), new Relation(this.dataset2, this.program6, AccessType.READ, twillRunId(run4)), new Relation(this.dataset1, this.program5, AccessType.READ, twillRunId(run5)), new Relation(this.dataset1, this.program6, AccessType.UNKNOWN, twillRunId(run4))), lineageAdmin.computeLineage(this.dataset1, 500L, System.currentTimeMillis() + 10000, 1, "workflow").getRelations());
        Lineage lineage2 = new Lineage(ImmutableSet.of(new Relation(this.dataset1, this.program1, AccessType.WRITE, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset2, this.program1, AccessType.READ, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset2, this.program2, AccessType.WRITE, twillRunId(run2), toSet(this.flowlet2)), new Relation(this.dataset3, this.program2, AccessType.READ, twillRunId(run2), toSet(this.flowlet2)), new Relation(this.dataset1, this.program3, AccessType.UNKNOWN, twillRunId(run3)), new Relation(this.dataset1, this.program5, AccessType.READ, twillRunId(run5)), new Relation[0]));
        Assert.assertEquals(lineage2, lineageAdmin.computeLineage(this.dataset1, 500L, System.currentTimeMillis() + 10000, 100, (String) null));
        Assert.assertEquals(lineage2, lineageAdmin.computeLineage(this.dataset2, 500L, System.currentTimeMillis() + 10000, 100, (String) null));
        Assert.assertEquals(ImmutableSet.of(new Relation(this.dataset1, this.program1, AccessType.WRITE, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset2, this.program1, AccessType.READ, twillRunId(run), toSet(this.flowlet1)), new Relation(this.dataset1, this.program5, AccessType.READ, twillRunId(run5)), new Relation(this.dataset1, this.program3, AccessType.UNKNOWN, twillRunId(run3))), lineageAdmin.computeLineage(this.dataset1, 500L, System.currentTimeMillis() + 10000, 1, (String) null).getRelations());
        Assert.assertEquals(toSet(metadataRecord, metadataRecord2, metadataRecord3, metadataRecord4), lineageAdmin.getMetadataForRun(run));
        NamespaceId namespaceId = new NamespaceId("custom_namespace");
        DatasetId dataset = namespaceId.dataset(this.dataset1.getEntityName());
        ProgramRunId run6 = namespaceId.app(this.program1.getApplication()).program(this.program1.getType(), this.program1.getEntityName()).run(run.getEntityName());
        Assert.assertEquals(new Lineage(ImmutableSet.of()), lineageAdmin.computeLineage(dataset, 500L, System.currentTimeMillis() + 10000, 100));
        Assert.assertEquals(ImmutableSet.of(), lineageAdmin.getMetadataForRun(run6));
    }

    @Test
    public void testScanRange() {
        LineageAdmin.ScanRangeWithFilter scanRange = LineageAdmin.getScanRange(ImmutableSet.of(RunIds.generate(500L), RunIds.generate(400L), RunIds.generate(600L), RunIds.generate(200L), RunIds.generate(700L), RunIds.generate(100L), new RunId[0]));
        Assert.assertEquals(100L, scanRange.getStart());
        Assert.assertEquals(701L, scanRange.getEnd());
        LineageAdmin.ScanRangeWithFilter scanRange2 = LineageAdmin.getScanRange(ImmutableSet.of());
        Assert.assertEquals(0L, scanRange2.getStart());
        Assert.assertEquals(0L, scanRange2.getEnd());
        LineageAdmin.ScanRangeWithFilter scanRange3 = LineageAdmin.getScanRange(ImmutableSet.of(RunIds.generate(100L)));
        Assert.assertEquals(100L, scanRange3.getStart());
        Assert.assertEquals(101L, scanRange3.getEnd());
    }

    private void setStartAndRunning(Store store, ProgramId programId, String str, long j) {
        setStartAndRunning(store, programId, str, j, ImmutableMap.of(), ImmutableMap.of());
    }

    private void setStartAndRunning(Store store, ProgramId programId, String str, long j, Map<String, String> map, Map<String, String> map2) {
        int i = this.sourceId + 1;
        this.sourceId = i;
        store.setStart(programId, str, j, (String) null, map, map2, AppFabricTestHelper.createSourceId(i));
        int i2 = this.sourceId + 1;
        this.sourceId = i2;
        store.setRunning(programId, str, j + 1, (String) null, AppFabricTestHelper.createSourceId(i2));
    }

    private void addRuns(Store store, ProgramRunId... programRunIdArr) {
        for (ProgramRunId programRunId : programRunIdArr) {
            setStartAndRunning(store, programRunId.getParent(), programRunId.getEntityName(), RunIds.getTime(RunIds.fromString(programRunId.getEntityName()), TimeUnit.SECONDS));
        }
    }

    private void addWorkflowRuns(Store store, String str, String str2, ProgramRunId... programRunIdArr) {
        HashMap hashMap = new HashMap();
        ImmutableMap of = ImmutableMap.of();
        hashMap.put("workflowName", str);
        hashMap.put("workflowNodeId", "workflowNodeId");
        hashMap.put("workflowRunId", str2);
        for (ProgramRunId programRunId : programRunIdArr) {
            ProgramId parent = programRunId.getParent();
            String entityName = programRunId.getEntityName();
            long time = RunIds.getTime(RunIds.fromString(programRunId.getEntityName()), TimeUnit.SECONDS);
            int i = this.sourceId + 1;
            this.sourceId = i;
            store.setStart(parent, entityName, time, (String) null, of, hashMap, AppFabricTestHelper.createSourceId(i));
            ProgramId parent2 = programRunId.getParent();
            String entityName2 = programRunId.getEntityName();
            long time2 = RunIds.getTime(RunIds.fromString(programRunId.getEntityName()), TimeUnit.SECONDS) + 1;
            int i2 = this.sourceId + 1;
            this.sourceId = i2;
            store.setRunning(parent2, entityName2, time2, (String) null, AppFabricTestHelper.createSourceId(i2));
        }
    }

    @SafeVarargs
    private static <T> Set<T> toSet(T... tArr) {
        return ImmutableSet.copyOf(tArr);
    }

    private Map<String, String> toMap(String str, String str2) {
        return ImmutableMap.of(str, str2);
    }

    private static Set<NamespacedEntityId> emptySet() {
        return Collections.emptySet();
    }

    private RunId twillRunId(ProgramRunId programRunId) {
        return RunIds.fromString(programRunId.getEntityName());
    }

    private TransactionExecutorFactory getTxExecFactory() {
        return (TransactionExecutorFactory) getInjector().getInstance(TransactionExecutorFactory.class);
    }

    private DatasetFramework getDatasetFramework() {
        return (DatasetFramework) getInjector().getInstance(DatasetFramework.class);
    }
}
