package co.cask.cdap.data2.metadata.lineage;

import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DatasetManagementException;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.Id;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

/* loaded from: input_file:co/cask/cdap/data2/metadata/lineage/LineageStore.class */
public class LineageStore {
    private static final Id.DatasetInstance LINEAGE_DATASET_ID = Id.DatasetInstance.from(Id.Namespace.SYSTEM, "lineage");
    private final TransactionExecutorFactory executorFactory;
    private final DatasetFramework datasetFramework;
    private final Id.DatasetInstance lineageDatasetId;

    @Inject
    public LineageStore(TransactionExecutorFactory transactionExecutorFactory, @Named("basicDatasetFramework") DatasetFramework datasetFramework) {
        this(transactionExecutorFactory, datasetFramework, LINEAGE_DATASET_ID);
    }

    @VisibleForTesting
    public LineageStore(TransactionExecutorFactory transactionExecutorFactory, DatasetFramework datasetFramework, Id.DatasetInstance datasetInstance) {
        this.executorFactory = transactionExecutorFactory;
        this.datasetFramework = datasetFramework;
        this.lineageDatasetId = datasetInstance;
    }

    public void addAccess(Id.Run run, Id.DatasetInstance datasetInstance, AccessType accessType, long j) {
        addAccess(run, datasetInstance, accessType, j, (Id.NamespacedId) null);
    }

    public void addAccess(final Id.Run run, final Id.DatasetInstance datasetInstance, final AccessType accessType, final long j, @Nullable final Id.NamespacedId namespacedId) {
        execute(new TransactionExecutor.Procedure<LineageDataset>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.1
            public void apply(LineageDataset lineageDataset) throws Exception {
                lineageDataset.addAccess(run, datasetInstance, accessType, j, namespacedId);
            }
        });
    }

    public void addAccess(Id.Run run, Id.Stream stream, AccessType accessType, long j) {
        addAccess(run, stream, accessType, j, (Id.NamespacedId) null);
    }

    public void addAccess(final Id.Run run, final Id.Stream stream, final AccessType accessType, final long j, @Nullable final Id.NamespacedId namespacedId) {
        execute(new TransactionExecutor.Procedure<LineageDataset>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.2
            public void apply(LineageDataset lineageDataset) throws Exception {
                lineageDataset.addAccess(run, stream, accessType, j, namespacedId);
            }
        });
    }

    public Set<Id.NamespacedId> getEntitiesForRun(final Id.Run run) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<Id.NamespacedId>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.3
            public Set<Id.NamespacedId> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getEntitiesForRun(run);
            }
        });
    }

    public Set<Relation> getRelations(final Id.DatasetInstance datasetInstance, final long j, final long j2, final Predicate<Relation> predicate) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<Relation>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.4
            public Set<Relation> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getRelations(datasetInstance, j, j2, predicate);
            }
        });
    }

    public Set<Relation> getRelations(final Id.Stream stream, final long j, final long j2, final Predicate<Relation> predicate) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<Relation>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.5
            public Set<Relation> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getRelations(stream, j, j2, predicate);
            }
        });
    }

    public Set<Relation> getRelations(final Id.Program program, final long j, final long j2, final Predicate<Relation> predicate) {
        return (Set) execute(new TransactionExecutor.Function<LineageDataset, Set<Relation>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.6
            public Set<Relation> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getRelations(program, j, j2, predicate);
            }
        });
    }

    @VisibleForTesting
    public List<Long> getAccessTimesForRun(final Id.Run run) {
        return (List) execute(new TransactionExecutor.Function<LineageDataset, List<Long>>() { // from class: co.cask.cdap.data2.metadata.lineage.LineageStore.7
            public List<Long> apply(LineageDataset lineageDataset) throws Exception {
                return lineageDataset.getAccessTimesForRun(run);
            }
        });
    }

    private <T> T execute(TransactionExecutor.Function<LineageDataset, T> function) {
        LineageDataset newLineageDataset = newLineageDataset();
        return (T) Transactions.createTransactionExecutor(this.executorFactory, (TransactionAware) newLineageDataset).executeUnchecked(function, newLineageDataset);
    }

    private void execute(TransactionExecutor.Procedure<LineageDataset> procedure) {
        LineageDataset newLineageDataset = newLineageDataset();
        Transactions.createTransactionExecutor(this.executorFactory, (TransactionAware) newLineageDataset).executeUnchecked(procedure, newLineageDataset);
    }

    private LineageDataset newLineageDataset() {
        try {
            return DatasetsUtil.getOrCreateDataset(this.datasetFramework, this.lineageDatasetId, LineageDataset.class.getName(), DatasetProperties.EMPTY, DatasetDefinition.NO_ARGUMENTS, null);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    public static void setupDatasets(DatasetFramework datasetFramework) throws IOException, DatasetManagementException {
        datasetFramework.addInstance(LineageDataset.class.getName(), LINEAGE_DATASET_ID, DatasetProperties.EMPTY);
    }
}
