package co.cask.cdap.internal.app.runtime.schedule;

import co.cask.cdap.AllProgramsApp;
import co.cask.cdap.AppWithStreamSizeSchedule;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.api.schedule.Schedule;
import co.cask.cdap.api.schedule.Schedules;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespaceAdmin;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.app.runtime.schedule.Scheduler;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/SchedulerTestBase.class */
public abstract class SchedulerTestBase {
    private static StreamSizeScheduler streamSizeScheduler;
    private static Store store;
    private static NamespaceAdmin namespaceAdmin;
    private static ProgramRuntimeService runtimeService;
    protected static MetricStore metricStore;
    protected static Injector injector;
    private static final String SCHEDULE_NAME_1 = "SampleSchedule1";
    protected static final CConfiguration CCONF = CConfiguration.create();
    private static final Id.Application APP_ID = new Id.Application(Id.Namespace.DEFAULT, "AppWithStreamSizeSchedule");
    private static final Id.Program PROGRAM_ID = new Id.Program(APP_ID, ProgramType.WORKFLOW, "SampleWorkflow");
    private static final SchedulableProgramType PROGRAM_TYPE = SchedulableProgramType.WORKFLOW;
    private static final Id.Stream STREAM_ID = Id.Stream.from(Id.Namespace.DEFAULT, AllProgramsApp.STREAM_NAME);
    private static final String SCHEDULE_NAME_2 = "SampleSchedule2";
    private static final Schedule UPDATE_SCHEDULE_2 = Schedules.builder(SCHEDULE_NAME_2).setDescription("Every 1M").createDataSchedule(Schedules.Source.STREAM, STREAM_ID.getId(), 1);

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m64get() {
            try {
                return SchedulerTestBase.tmpFolder.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/SchedulerTestBase$StreamMetricsPublisher.class */
    protected interface StreamMetricsPublisher {
        void increment(long j) throws Exception;
    }

    protected abstract StreamMetricsPublisher createMetricsPublisher(Id.Stream stream);

    @BeforeClass
    public static void init() throws Exception {
        injector = AppFabricTestHelper.getInjector(CCONF);
        streamSizeScheduler = (StreamSizeScheduler) injector.getInstance(StreamSizeScheduler.class);
        store = (Store) injector.getInstance(Store.class);
        metricStore = (MetricStore) injector.getInstance(MetricStore.class);
        namespaceAdmin = (NamespaceAdmin) injector.getInstance(NamespaceAdmin.class);
        namespaceAdmin.create(NamespaceMeta.DEFAULT);
        runtimeService = (ProgramRuntimeService) injector.getInstance(ProgramRuntimeService.class);
    }

    @Test
    public void testStreamSizeSchedule() throws Exception {
        AppFabricTestHelper.deployApplicationWithManager(AppWithStreamSizeSchedule.class, TEMP_FOLDER_SUPPLIER);
        Assert.assertEquals(Scheduler.ScheduleState.SUSPENDED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1));
        Assert.assertEquals(Scheduler.ScheduleState.SUSPENDED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(Scheduler.ScheduleState.SCHEDULED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1));
        Assert.assertEquals(Scheduler.ScheduleState.SCHEDULED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        Assert.assertEquals(0L, store.getRuns(PROGRAM_ID, ProgramRunStatus.ALL, 0L, Long.MAX_VALUE, 100).size());
        StreamMetricsPublisher createMetricsPublisher = createMetricsPublisher(STREAM_ID);
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 1, 15L);
        waitUntilFinished(runtimeService, PROGRAM_ID, 15L);
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 3, 15L);
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(Scheduler.ScheduleState.SUSPENDED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 4, 15L);
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(Scheduler.ScheduleState.SCHEDULED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 6, 15L);
        streamSizeScheduler.updateSchedule(PROGRAM_ID, PROGRAM_TYPE, UPDATE_SCHEDULE_2);
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 8, 15L);
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        streamSizeScheduler.deleteSchedules(PROGRAM_ID, PROGRAM_TYPE);
        waitUntilFinished(runtimeService, PROGRAM_ID, 10L);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        namespaceAdmin.delete(Id.Namespace.DEFAULT);
    }

    private void waitUntilFinished(final ProgramRuntimeService programRuntimeService, final Id.Program program, long j) throws Exception {
        Tasks.waitFor(false, new Callable<Boolean>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(SchedulerTestBase.this.isProgramRunning(programRuntimeService, program));
            }
        }, j, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    private void waitForRuns(final Store store2, final Id.Program program, int i, long j) throws Exception {
        Tasks.waitFor(Integer.valueOf(i), new Callable<Integer>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(store2.getRuns(program, ProgramRunStatus.COMPLETED, 0L, Long.MAX_VALUE, 100).size());
            }
        }, j, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isProgramRunning(ProgramRuntimeService programRuntimeService, final Id.Program program) {
        return programRuntimeService.checkAnyRunning(new Predicate<Id.Program>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.4
            public boolean apply(Id.Program program2) {
                return program2.equals(program);
            }
        }, ProgramType.values());
    }
}
