package co.cask.cdap.metadata;

import co.cask.cdap.AllProgramsApp;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.common.utils.TimeMathParser;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.lineage.Lineage;
import co.cask.cdap.data2.metadata.lineage.LineageSerializer;
import co.cask.cdap.data2.metadata.lineage.Relation;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.RunRecord;
import co.cask.cdap.proto.metadata.MetadataRecord;
import co.cask.cdap.proto.metadata.MetadataScope;
import co.cask.cdap.proto.metadata.lineage.LineageRecord;
import co.cask.cdap.test.SlowTests;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.http.HttpResponse;
import org.apache.twill.api.RunId;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({SlowTests.class})
/* loaded from: input_file:co/cask/cdap/metadata/LineageTest.class */
public class LineageTest extends MetadataTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(LineageTest.class);
    private static final String STOPPED = "STOPPED";

    @Test
    public void testFlowLineage() throws Exception {
        Id.Application from = Id.Application.from("testFlowLineage", AllProgramsApp.NAME);
        Id.Flow from2 = Id.Flow.from(from, AllProgramsApp.NoOpFlow.NAME);
        Id.DatasetInstance from3 = Id.DatasetInstance.from("testFlowLineage", AllProgramsApp.DATASET_NAME);
        Id.Stream from4 = Id.Stream.from("testFlowLineage", AllProgramsApp.STREAM_NAME);
        Assert.assertEquals(200L, status(createNamespace("testFlowLineage")));
        try {
            Assert.assertEquals(200L, status(deploy(AllProgramsApp.class, "v3", "testFlowLineage")));
            ImmutableMap of = ImmutableMap.of("app-key1", "app-value1");
            addProperties(from, (Map<String, String>) of);
            Assert.assertEquals(of, getProperties(from, MetadataScope.USER));
            ImmutableSet of2 = ImmutableSet.of("app-tag1");
            addTags(from, (Set<String>) of2);
            Assert.assertEquals(of2, getTags(from, MetadataScope.USER));
            ImmutableMap of3 = ImmutableMap.of("flow-key1", "flow-value1");
            addProperties((Id.Program) from2, (Map<String, String>) of3);
            Assert.assertEquals(of3, getProperties((Id.Program) from2, MetadataScope.USER));
            ImmutableSet of4 = ImmutableSet.of("flow-tag1", "flow-tag2");
            addTags((Id.Program) from2, (Set<String>) of4);
            Assert.assertEquals(of4, getTags((Id.Program) from2, MetadataScope.USER));
            ImmutableMap of5 = ImmutableMap.of("data-key1", "data-value1");
            addProperties(from3, (Map<String, String>) of5);
            Assert.assertEquals(of5, getProperties(from3, MetadataScope.USER));
            ImmutableSet of6 = ImmutableSet.of("data-tag1", "data-tag2");
            addTags(from3, (Set<String>) of6);
            Assert.assertEquals(of6, getTags(from3, MetadataScope.USER));
            ImmutableMap of7 = ImmutableMap.of("stream-key1", "stream-value1");
            addProperties(from4, (Map<String, String>) of7);
            Assert.assertEquals(of7, getProperties(from4, MetadataScope.USER));
            ImmutableSet of8 = ImmutableSet.of("stream-tag1", "stream-tag2");
            addTags(from4, (Set<String>) of8);
            Assert.assertEquals(of8, getTags(from4, MetadataScope.USER));
            long nowInSeconds = TimeMathParser.nowInSeconds();
            RunId runAndWait = runAndWait(from2);
            TimeUnit.SECONDS.sleep(2L);
            waitForStop(from2, true);
            long nowInSeconds2 = TimeMathParser.nowInSeconds();
            LineageRecord fetchLineage = fetchLineage(from3, nowInSeconds, nowInSeconds2, 10);
            LineageRecord lineageRecord = LineageSerializer.toLineageRecord(nowInSeconds, nowInSeconds2, new Lineage(ImmutableSet.of(new Relation(from3, from2, AccessType.UNKNOWN, runAndWait, ImmutableSet.of(Id.Flow.Flowlet.from(from2, "A"))), new Relation(from4, from2, AccessType.READ, runAndWait, ImmutableSet.of(Id.Flow.Flowlet.from(from2, "A"))))));
            Assert.assertEquals(lineageRecord, fetchLineage);
            Assert.assertEquals(lineageRecord.getRelations(), fetchLineage(from3, "now-1h", "now+1h", 10).getRelations());
            Assert.assertEquals(lineageRecord, fetchLineage(from4, nowInSeconds, nowInSeconds2, 10));
            Assert.assertEquals(lineageRecord.getRelations(), fetchLineage(from4, "now-1h", "now+1h", 10).getRelations());
            Assert.assertEquals(toSet(new MetadataRecord(from, of, of2), new MetadataRecord(Id.Program.from(from2.getApplication(), from2.getType(), from2.getId()), of3, of4), new MetadataRecord(from3, of5, of6), new MetadataRecord(from4, of7, of8)), fetchRunMetadata(new Id.Run(from2, runAndWait.getId())));
            long j = nowInSeconds2 + 1000;
            long j2 = nowInSeconds2 + 5000;
            Assert.assertEquals(LineageSerializer.toLineageRecord(j, j2, new Lineage(ImmutableSet.of())), fetchLineage(from4, j, j2, 10));
            long j3 = nowInSeconds - 5000;
            long j4 = nowInSeconds - 1000;
            Assert.assertEquals(LineageSerializer.toLineageRecord(j3, j4, new Lineage(ImmutableSet.of())), fetchLineage(from4, j3, j4, 10));
            fetchLineage(from3, "sometime", "sometime", 10, BadRequestException.class);
            fetchLineage(from3, "now+1h", "now-1h", 10, BadRequestException.class);
            assertRunMetadataNotFound(new Id.Run(from2, RunIds.generate(1000L).getId()));
        } finally {
            try {
                deleteNamespace("testFlowLineage");
            } catch (Throwable th) {
                LOG.error("Got exception while deleting namespace {}", "testFlowLineage", th);
            }
        }
    }

    @Test
    public void testAllProgramsLineage() throws Exception {
        Id.Application from = Id.Application.from("testAllProgramsLineage", AllProgramsApp.NAME);
        Id.Flow from2 = Id.Flow.from(from, AllProgramsApp.NoOpFlow.NAME);
        Id.Program from3 = Id.Program.from(from, ProgramType.MAPREDUCE, AllProgramsApp.NoOpMR.NAME);
        Id.Program from4 = Id.Program.from(from, ProgramType.SPARK, AllProgramsApp.NoOpSpark.NAME);
        Id.Program from5 = Id.Program.from(from, ProgramType.SERVICE, AllProgramsApp.NoOpService.NAME);
        Id.Program from6 = Id.Program.from(from, ProgramType.WORKER, AllProgramsApp.NoOpWorker.NAME);
        Id.Program from7 = Id.Program.from(from, ProgramType.WORKFLOW, AllProgramsApp.NoOpWorkflow.NAME);
        Id.DatasetInstance from8 = Id.DatasetInstance.from("testAllProgramsLineage", AllProgramsApp.DATASET_NAME);
        Id.Stream from9 = Id.Stream.from("testAllProgramsLineage", AllProgramsApp.STREAM_NAME);
        Assert.assertEquals(200L, status(createNamespace("testAllProgramsLineage")));
        try {
            Assert.assertEquals(200L, status(deploy(AllProgramsApp.class, "v3", "testAllProgramsLineage")));
            ImmutableSet of = ImmutableSet.of("spark-tag1", "spark-tag2");
            addTags(from4, (Set<String>) of);
            Assert.assertEquals(of, getTags(from4, MetadataScope.USER));
            ImmutableSet of2 = ImmutableSet.of("worker-tag1");
            addTags(from6, (Set<String>) of2);
            Assert.assertEquals(of2, getTags(from6, MetadataScope.USER));
            ImmutableMap of3 = ImmutableMap.of("data-key1", "data-value1");
            addProperties(from8, (Map<String, String>) of3);
            Assert.assertEquals(of3, getProperties(from8, MetadataScope.USER));
            RunId runAndWait = runAndWait(from2);
            RunId runAndWait2 = runAndWait(from3);
            RunId runAndWait3 = runAndWait(from4);
            runAndWait(from7);
            RunId runId = getRunId(from3, runAndWait2);
            RunId runAndWait4 = runAndWait(from5);
            RunId runAndWait5 = runAndWait(from6);
            waitForStop(from2, true);
            waitForStop(from3, false);
            waitForStop(from4, false);
            waitForStop(from7, false);
            waitForStop(from6, false);
            waitForStop(from5, true);
            long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
            long seconds2 = TimeUnit.HOURS.toSeconds(1L);
            LineageRecord fetchLineage = fetchLineage(from8, seconds - seconds2, seconds + seconds2, 10);
            LineageRecord lineageRecord = LineageSerializer.toLineageRecord(seconds - seconds2, seconds + seconds2, new Lineage(ImmutableSet.of(new Relation(from8, from2, AccessType.UNKNOWN, runAndWait, ImmutableSet.of(Id.Flow.Flowlet.from(from2, "A"))), new Relation(from8, from3, AccessType.UNKNOWN, runAndWait2), new Relation(from8, from4, AccessType.UNKNOWN, runAndWait3), new Relation(from8, from3, AccessType.UNKNOWN, runId), new Relation(from8, from5, AccessType.UNKNOWN, runAndWait4), new Relation(from8, from6, AccessType.UNKNOWN, runAndWait5), new Relation[]{new Relation(from9, from2, AccessType.READ, runAndWait, ImmutableSet.of(Id.Flow.Flowlet.from(from2, "A"))), new Relation(from9, from3, AccessType.READ, runAndWait2), new Relation(from9, from4, AccessType.READ, runAndWait3), new Relation(from9, from3, AccessType.READ, runId), new Relation(from9, from6, AccessType.WRITE, runAndWait5)})));
            Assert.assertEquals(lineageRecord, fetchLineage);
            Assert.assertEquals(lineageRecord, fetchLineage(from9, seconds - seconds2, seconds + seconds2, 10));
            Assert.assertEquals(toSet(new MetadataRecord(from, emptyMap(), emptySet()), new MetadataRecord(Id.Program.from(from2.getApplication(), from2.getType(), from2.getId()), emptyMap(), emptySet()), new MetadataRecord(from8, of3, emptySet()), new MetadataRecord(from9, emptyMap(), emptySet())), fetchRunMetadata(new Id.Run(from2, runAndWait.getId())));
            Assert.assertEquals(toSet(new MetadataRecord(from, emptyMap(), emptySet()), new MetadataRecord(Id.Program.from(from6.getApplication(), from6.getType(), from6.getId()), emptyMap(), of2), new MetadataRecord(from8, of3, emptySet()), new MetadataRecord(from9, emptyMap(), emptySet())), fetchRunMetadata(new Id.Run(from6, runAndWait5.getId())));
            Assert.assertEquals(toSet(new MetadataRecord(from, emptyMap(), emptySet()), new MetadataRecord(Id.Program.from(from4.getApplication(), from4.getType(), from4.getId()), emptyMap(), of), new MetadataRecord(from8, of3, emptySet()), new MetadataRecord(from9, emptyMap(), emptySet())), fetchRunMetadata(new Id.Run(from4, runAndWait3.getId())));
        } finally {
            try {
                deleteNamespace("testAllProgramsLineage");
            } catch (Throwable th) {
                LOG.error("Got exception while deleting namespace {}", "testAllProgramsLineage", th);
            }
        }
    }

    @Test
    public void testLineageInNonExistingNamespace() throws Exception {
        Id.Flow from = Id.Flow.from(Id.Application.from("nonExistent", AllProgramsApp.NAME), AllProgramsApp.NoOpFlow.NAME);
        Id.DatasetInstance from2 = Id.DatasetInstance.from("nonExistent", AllProgramsApp.DATASET_NAME);
        Id.Stream from3 = Id.Stream.from("nonExistent", AllProgramsApp.STREAM_NAME);
        fetchLineage(from2, 0L, 10000L, 10, NotFoundException.class);
        try {
            fetchLineage(from3, 0L, 10000L, 10);
            Assert.fail("Expected not to be able to fetch lineage for nonexistent stream: " + from3);
        } catch (NotFoundException e) {
        }
        assertRunMetadataNotFound(new Id.Run(from, RunIds.generate(1000L).getId()));
    }

    @Test
    public void testLineageForNonExistingEntity() throws Exception {
        Id.DatasetInstance from = Id.DatasetInstance.from("default", "dummy");
        fetchLineage(from, 100L, 200L, 10, NotFoundException.class);
        fetchLineage(from, -100L, 200L, 10, BadRequestException.class);
        fetchLineage(from, 100L, -200L, 10, BadRequestException.class);
        fetchLineage(from, 200L, 100L, 10, BadRequestException.class);
        fetchLineage(from, 100L, 200L, -10, BadRequestException.class);
    }

    private RunId runAndWait(Id.Program program) throws Exception {
        LOG.info("Starting program {}", program);
        startProgram(program);
        waitState(program, ProgramRunStatus.RUNNING.toString());
        return getRunId(program);
    }

    private void waitForStop(Id.Program program, boolean z) throws Exception {
        if (z && getProgramStatus(program).equals(ProgramRunStatus.RUNNING.toString())) {
            LOG.info("Stopping program {}", program);
            stopProgram(program);
        }
        waitState(program, STOPPED);
        LOG.info("Program {} has stopped", program);
    }

    private RunId getRunId(Id.Program program) throws Exception {
        return getRunId(program, null);
    }

    private RunId getRunId(final Id.Program program, @Nullable final RunId runId) throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        Tasks.waitFor(1, new Callable<Integer>() { // from class: co.cask.cdap.metadata.LineageTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                atomicReference.set(Iterables.filter(LineageTest.this.getProgramRuns(program, "RUNNING"), new Predicate<RunRecord>() { // from class: co.cask.cdap.metadata.LineageTest.1.1
                    public boolean apply(RunRecord runRecord) {
                        return runId == null || !runRecord.getPid().equals(runId.getId());
                    }
                }));
                return Integer.valueOf(Iterables.size((Iterable) atomicReference.get()));
            }
        }, 60L, TimeUnit.SECONDS, 10L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(1L, Iterables.size((Iterable) atomicReference.get()));
        return RunIds.fromString(((RunRecord) Iterables.getFirst((Iterable) atomicReference.get(), (Object) null)).getPid());
    }

    private int status(HttpResponse httpResponse) {
        return httpResponse.getStatusLine().getStatusCode();
    }

    @SafeVarargs
    private static <T> Set<T> toSet(T... tArr) {
        return ImmutableSet.copyOf(tArr);
    }

    private Set<String> emptySet() {
        return Collections.emptySet();
    }

    private Map<String, String> emptyMap() {
        return Collections.emptyMap();
    }
}
