package co.cask.cdap.logging.meta;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.TxCallable;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/meta/DefaultCheckpointManager.class */
public final class DefaultCheckpointManager implements CheckpointManager {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultCheckpointManager.class);
    private static final byte[] OFFSET_COL_NAME = Bytes.toBytes("nextOffset");
    private static final byte[] NEXT_TIME_COL_NAME = Bytes.toBytes("nextEventTime");
    private static final byte[] MAX_TIME_COL_NAME = Bytes.toBytes("maxEventTime");
    private final byte[] rowKeyPrefix;
    private final DatasetFramework datasetFramework;
    private final Transactional transactional;
    private Map<Integer, Checkpoint> lastCheckpoint = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public DefaultCheckpointManager(DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, String str, byte[] bArr) {
        this.rowKeyPrefix = Bytes.add(bArr, Bytes.toBytes(str));
        this.datasetFramework = datasetFramework;
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), transactionSystemClient, NamespaceId.SYSTEM, ImmutableMap.of(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[0])), RetryStrategies.retryOnConflict(20, 100L));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Table getCheckpointTable(DatasetContext datasetContext) throws IOException, DatasetManagementException {
        return LoggingStoreTableUtil.getMetadataTable(this.datasetFramework, datasetContext);
    }

    @Override // co.cask.cdap.logging.meta.CheckpointManager
    public void saveCheckpoints(final Map<Integer, ? extends Checkpoint> map) throws Exception {
        if (this.lastCheckpoint.equals(map)) {
            return;
        }
        try {
            this.lastCheckpoint = (Map) Transactions.execute(this.transactional, new TxCallable<Map<Integer, Checkpoint>>() { // from class: co.cask.cdap.logging.meta.DefaultCheckpointManager.1
                /* renamed from: call, reason: merged with bridge method [inline-methods] */
                public Map<Integer, Checkpoint> m35call(DatasetContext datasetContext) throws Exception {
                    HashMap hashMap = new HashMap();
                    Table checkpointTable = DefaultCheckpointManager.this.getCheckpointTable(datasetContext);
                    for (Map.Entry entry : map.entrySet()) {
                        byte[] add = Bytes.add(DefaultCheckpointManager.this.rowKeyPrefix, Bytes.toBytes(((Integer) entry.getKey()).intValue()));
                        Checkpoint checkpoint = (Checkpoint) entry.getValue();
                        checkpointTable.put(add, DefaultCheckpointManager.OFFSET_COL_NAME, Bytes.toBytes(checkpoint.getNextOffset()));
                        checkpointTable.put(add, DefaultCheckpointManager.NEXT_TIME_COL_NAME, Bytes.toBytes(checkpoint.getNextEventTime()));
                        checkpointTable.put(add, DefaultCheckpointManager.MAX_TIME_COL_NAME, Bytes.toBytes(checkpoint.getMaxEventTime()));
                        hashMap.put(entry.getKey(), new Checkpoint(checkpoint.getNextOffset(), checkpoint.getNextEventTime(), checkpoint.getMaxEventTime()));
                    }
                    return hashMap;
                }
            });
            LOG.trace("Saved checkpoints for partitions {}", map);
        } catch (TransactionFailureException e) {
            throw Transactions.propagate(e, ServiceUnavailableException.class);
        }
    }

    @Override // co.cask.cdap.logging.meta.CheckpointManager
    public Map<Integer, Checkpoint> getCheckpoint(final Set<Integer> set) throws Exception {
        try {
            return (Map) Transactions.execute(this.transactional, new TxCallable<Map<Integer, Checkpoint>>() { // from class: co.cask.cdap.logging.meta.DefaultCheckpointManager.2
                /* renamed from: call, reason: merged with bridge method [inline-methods] */
                public Map<Integer, Checkpoint> m36call(DatasetContext datasetContext) throws Exception {
                    Table checkpointTable = DefaultCheckpointManager.this.getCheckpointTable(datasetContext);
                    HashMap hashMap = new HashMap();
                    Iterator it = set.iterator();
                    while (it.hasNext()) {
                        int intValue = ((Integer) it.next()).intValue();
                        hashMap.put(Integer.valueOf(intValue), DefaultCheckpointManager.this.createFromRow(checkpointTable.get(Bytes.add(DefaultCheckpointManager.this.rowKeyPrefix, Bytes.toBytes(intValue)))));
                    }
                    return hashMap;
                }
            });
        } catch (TransactionFailureException e) {
            throw Transactions.propagate(e, ServiceUnavailableException.class);
        }
    }

    @Override // co.cask.cdap.logging.meta.CheckpointManager
    public Checkpoint getCheckpoint(final int i) throws Exception {
        try {
            Checkpoint checkpoint = (Checkpoint) Transactions.execute(this.transactional, new TxCallable<Checkpoint>() { // from class: co.cask.cdap.logging.meta.DefaultCheckpointManager.3
                /* renamed from: call, reason: merged with bridge method [inline-methods] */
                public Checkpoint m37call(DatasetContext datasetContext) throws Exception {
                    return DefaultCheckpointManager.this.createFromRow(DefaultCheckpointManager.this.getCheckpointTable(datasetContext).get(Bytes.add(DefaultCheckpointManager.this.rowKeyPrefix, Bytes.toBytes(i))));
                }
            });
            LOG.trace("Read checkpoint {} for partition {}", checkpoint, Integer.valueOf(i));
            return checkpoint;
        } catch (TransactionFailureException e) {
            throw Transactions.propagate(e, ServiceUnavailableException.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Checkpoint createFromRow(Row row) {
        long j = row.getLong(MAX_TIME_COL_NAME, -1L);
        return new Checkpoint(row.getLong(OFFSET_COL_NAME, -1L), row.getLong(NEXT_TIME_COL_NAME, j), j);
    }
}
