package co.cask.cdap.internal.app.runtime.schedule.store;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.data2.dataset2.DatasetManagementException;
import co.cask.cdap.internal.app.runtime.schedule.AbstractSchedulerService;
import co.cask.cdap.internal.app.runtime.schedule.StreamSizeScheduleState;
import co.cask.cdap.internal.schedule.StreamSizeSchedule;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionFailureException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/store/DatasetBasedStreamSizeScheduleStore.class */
public class DatasetBasedStreamSizeScheduleStore {
    private static final String KEY_PREFIX = "streamSizeSchedule";
    private final TransactionExecutorFactory factory;
    private final ScheduleStoreTableUtil tableUtil;
    private Table table;
    private static final Logger LOG = LoggerFactory.getLogger(DatasetBasedStreamSizeScheduleStore.class);
    private static final Gson GSON = new Gson();
    private static final byte[] SCHEDULE_COL = Bytes.toBytes("schedule");
    private static final byte[] BASE_SIZE_COL = Bytes.toBytes("baseSize");
    private static final byte[] BASE_TS_COL = Bytes.toBytes("baseTs");
    private static final byte[] LAST_RUN_SIZE_COL = Bytes.toBytes("lastRunSize");
    private static final byte[] LAST_RUN_TS_COL = Bytes.toBytes("lastRunTs");
    private static final byte[] ACTIVE_COL = Bytes.toBytes("active");

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/store/DatasetBasedStreamSizeScheduleStore$TransactionMethod.class */
    public interface TransactionMethod {
        void execute() throws Exception;
    }

    @Inject
    public DatasetBasedStreamSizeScheduleStore(TransactionExecutorFactory transactionExecutorFactory, ScheduleStoreTableUtil scheduleStoreTableUtil) {
        this.tableUtil = scheduleStoreTableUtil;
        this.factory = transactionExecutorFactory;
    }

    public void initialize() throws IOException, DatasetManagementException {
        this.table = this.tableUtil.getMetaTable();
        Preconditions.checkNotNull(this.table, "Could not get dataset client for data set: %s", new Object[]{ScheduleStoreTableUtil.SCHEDULE_STORE_DATASET_NAME});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r0v3, types: [byte[], byte[][]] */
    public void persist(Id.Program program, SchedulableProgramType schedulableProgramType, StreamSizeSchedule streamSizeSchedule, long j, long j2, long j3, long j4, boolean z) throws TransactionFailureException, InterruptedException {
        updateTable(program, schedulableProgramType, streamSizeSchedule.getName(), new byte[]{SCHEDULE_COL, BASE_SIZE_COL, BASE_TS_COL, LAST_RUN_SIZE_COL, LAST_RUN_TS_COL, ACTIVE_COL}, new byte[]{Bytes.toBytes(GSON.toJson(streamSizeSchedule)), Bytes.toBytes(j), Bytes.toBytes(j2), Bytes.toBytes(j3), Bytes.toBytes(j4), Bytes.toBytes(z)}, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r5v2, types: [byte[], byte[][]] */
    public void suspend(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws TransactionFailureException, InterruptedException {
        updateTable(program, schedulableProgramType, str, new byte[]{ACTIVE_COL}, new byte[]{Bytes.toBytes(false)}, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r5v2, types: [byte[], byte[][]] */
    public void resume(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws TransactionFailureException, InterruptedException {
        updateTable(program, schedulableProgramType, str, new byte[]{ACTIVE_COL}, new byte[]{Bytes.toBytes(true)}, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r5v3, types: [byte[], byte[][]] */
    public void updateBaseRun(Id.Program program, SchedulableProgramType schedulableProgramType, String str, long j, long j2) throws TransactionFailureException, InterruptedException {
        updateTable(program, schedulableProgramType, str, new byte[]{BASE_SIZE_COL, BASE_TS_COL}, new byte[]{Bytes.toBytes(j), Bytes.toBytes(j2)}, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r5v3, types: [byte[], byte[][]] */
    public void updateLastRun(Id.Program program, SchedulableProgramType schedulableProgramType, String str, long j, long j2, TransactionMethod transactionMethod) throws TransactionFailureException, InterruptedException {
        updateTable(program, schedulableProgramType, str, new byte[]{LAST_RUN_SIZE_COL, LAST_RUN_TS_COL}, new byte[]{Bytes.toBytes(j), Bytes.toBytes(j2)}, transactionMethod);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r5v2, types: [byte[], byte[][]] */
    public void updateSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, String str, StreamSizeSchedule streamSizeSchedule) throws TransactionFailureException, InterruptedException {
        updateTable(program, schedulableProgramType, str, new byte[]{SCHEDULE_COL}, new byte[]{Bytes.toBytes(GSON.toJson(streamSizeSchedule))}, null);
    }

    public void delete(final Id.Program program, final SchedulableProgramType schedulableProgramType, final String str) throws InterruptedException, TransactionFailureException {
        this.factory.createExecutor(ImmutableList.of(this.table)).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStore.1
            public void apply() throws Exception {
                DatasetBasedStreamSizeScheduleStore.this.table.delete(Bytes.toBytes(String.format("%s:%s", DatasetBasedStreamSizeScheduleStore.KEY_PREFIX, AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str))));
            }
        });
    }

    public List<StreamSizeScheduleState> list() throws InterruptedException, TransactionFailureException {
        final ArrayList newArrayList = Lists.newArrayList();
        this.factory.createExecutor(ImmutableList.of(this.table)).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStore.2
            public void apply() throws Exception {
                byte[] bytes = Bytes.toBytes(DatasetBasedStreamSizeScheduleStore.KEY_PREFIX);
                Scanner scan = DatasetBasedStreamSizeScheduleStore.this.table.scan(bytes, Bytes.stopKeyForPrefix(bytes));
                while (true) {
                    Row next = scan.next();
                    if (next == null) {
                        return;
                    }
                    byte[] bArr = next.get(DatasetBasedStreamSizeScheduleStore.SCHEDULE_COL);
                    byte[] bArr2 = next.get(DatasetBasedStreamSizeScheduleStore.BASE_SIZE_COL);
                    byte[] bArr3 = next.get(DatasetBasedStreamSizeScheduleStore.BASE_TS_COL);
                    byte[] bArr4 = next.get(DatasetBasedStreamSizeScheduleStore.LAST_RUN_SIZE_COL);
                    byte[] bArr5 = next.get(DatasetBasedStreamSizeScheduleStore.LAST_RUN_TS_COL);
                    byte[] bArr6 = next.get(DatasetBasedStreamSizeScheduleStore.ACTIVE_COL);
                    if (bArr != null && bArr2 != null && bArr3 != null && bArr4 != null && bArr5 != null && bArr6 != null) {
                        String[] split = Bytes.toString(next.getRow()).split(":");
                        if (split.length == 6) {
                            StreamSizeScheduleState streamSizeScheduleState = new StreamSizeScheduleState(Id.Program.from(split[1], split[2], ProgramType.valueOf(split[3]), split[4]), SchedulableProgramType.valueOf(split[3]), (StreamSizeSchedule) DatasetBasedStreamSizeScheduleStore.GSON.fromJson(Bytes.toString(bArr), StreamSizeSchedule.class), Bytes.toLong(bArr2), Bytes.toLong(bArr3), Bytes.toLong(bArr4), Bytes.toLong(bArr5), Bytes.toBoolean(bArr6));
                            newArrayList.add(streamSizeScheduleState);
                            DatasetBasedStreamSizeScheduleStore.LOG.debug("StreamSizeSchedule found in store: {}", streamSizeScheduleState);
                        }
                    }
                }
            }
        });
        return newArrayList;
    }

    private void updateTable(final Id.Program program, final SchedulableProgramType schedulableProgramType, final String str, final byte[][] bArr, final byte[][] bArr2, @Nullable final TransactionMethod transactionMethod) throws InterruptedException, TransactionFailureException {
        this.factory.createExecutor(ImmutableList.of(this.table)).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStore.3
            public void apply() throws Exception {
                if (transactionMethod != null) {
                    transactionMethod.execute();
                }
                DatasetBasedStreamSizeScheduleStore.this.table.put(Bytes.toBytes(String.format("%s:%s", DatasetBasedStreamSizeScheduleStore.KEY_PREFIX, AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str))), bArr, bArr2);
                DatasetBasedStreamSizeScheduleStore.LOG.debug("Updated schedule {} with columns {}, values {}", new Object[]{str, bArr, bArr2});
            }
        });
    }
}
