package co.cask.cdap.internal.app.runtime.schedule;

import co.cask.cdap.AppWithStreamSizeSchedule;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.ConflictException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespaceAdmin;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.app.runtime.schedule.trigger.StreamSizeTrigger;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ScheduleId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.scheduler.Scheduler;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/SchedulerTestBase.class */
public abstract class SchedulerTestBase {
    private static Scheduler scheduler;
    private static Store store;
    private static NamespaceAdmin namespaceAdmin;
    private static ProgramRuntimeService runtimeService;
    protected static MetricStore metricStore;
    protected static Injector injector;
    private static final String SCHEDULE_NAME_2 = "SampleSchedule2";
    static final CConfiguration CCONF = CConfiguration.create();
    private static final ApplicationId APP_ID = NamespaceId.DEFAULT.app("AppWithStreamSizeSchedule");
    private static final ProgramId PROGRAM_ID = APP_ID.workflow("SampleWorkflow");
    private static final String SCHEDULE_NAME_1 = "SampleSchedule1";
    private static final ScheduleId SCHEDULE_ID_1 = APP_ID.schedule(SCHEDULE_NAME_1);
    private static final ScheduleId SCHEDULE_ID_2 = APP_ID.schedule("SampleSchedule2");
    private static final StreamId STREAM_ID = NamespaceId.DEFAULT.stream("stream");
    private static final ProgramSchedule UPDATE_SCHEDULE_2 = new ProgramSchedule("SampleSchedule2", "Every 1M", PROGRAM_ID, AppWithStreamSizeSchedule.SCHEDULE_PROPS, new StreamSizeTrigger(STREAM_ID, 1), Collections.emptyList());

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m59get() {
            try {
                return SchedulerTestBase.tmpFolder.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/SchedulerTestBase$StreamMetricsPublisher.class */
    protected interface StreamMetricsPublisher {
        void increment(long j) throws Exception;
    }

    protected abstract StreamMetricsPublisher createMetricsPublisher(StreamId streamId);

    @BeforeClass
    public static void init() throws Exception {
        injector = AppFabricTestHelper.getInjector(CCONF);
        scheduler = (Scheduler) injector.getInstance(Scheduler.class);
        store = (Store) injector.getInstance(Store.class);
        metricStore = (MetricStore) injector.getInstance(MetricStore.class);
        namespaceAdmin = (NamespaceAdmin) injector.getInstance(NamespaceAdmin.class);
        namespaceAdmin.create(NamespaceMeta.DEFAULT);
        runtimeService = (ProgramRuntimeService) injector.getInstance(ProgramRuntimeService.class);
    }

    @Test
    public void testStreamSizeSchedule() throws Exception {
        AppFabricTestHelper.deployApplicationWithManager(AppWithStreamSizeSchedule.class, TEMP_FOLDER_SUPPLIER);
        Assert.assertEquals(ProgramScheduleStatus.SUSPENDED, scheduler.getScheduleStatus(SCHEDULE_ID_1));
        Assert.assertEquals(ProgramScheduleStatus.SUSPENDED, scheduler.getScheduleStatus(SCHEDULE_ID_2));
        scheduler.enableSchedule(SCHEDULE_ID_1);
        scheduler.enableSchedule(SCHEDULE_ID_2);
        Assert.assertEquals(ProgramScheduleStatus.SCHEDULED, scheduler.getScheduleStatus(SCHEDULE_ID_1));
        Assert.assertEquals(ProgramScheduleStatus.SCHEDULED, scheduler.getScheduleStatus(SCHEDULE_ID_2));
        Assert.assertEquals(0L, store.getRuns(PROGRAM_ID, ProgramRunStatus.ALL, 0L, Long.MAX_VALUE, 100).size());
        StreamMetricsPublisher createMetricsPublisher = createMetricsPublisher(STREAM_ID);
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 1, 15L);
        waitUntilFinished(runtimeService, PROGRAM_ID, 15L);
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 3, 15L);
        scheduler.disableSchedule(SCHEDULE_ID_2);
        try {
            scheduler.disableSchedule(SCHEDULE_ID_2);
            Assert.fail("disable() should have failed because schedule was already disabled");
        } catch (ConflictException e) {
        }
        Assert.assertEquals(ProgramScheduleStatus.SUSPENDED, scheduler.getScheduleStatus(SCHEDULE_ID_2));
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 4, 15L);
        scheduler.enableSchedule(SCHEDULE_ID_2);
        try {
            scheduler.enableSchedule(SCHEDULE_ID_2);
            Assert.fail("enable() should have failed because schedule was already enabled");
        } catch (ConflictException e2) {
        }
        Assert.assertEquals(ProgramScheduleStatus.SCHEDULED, scheduler.getScheduleStatus(SCHEDULE_ID_2));
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 6, 15L);
        scheduler.updateSchedule(UPDATE_SCHEDULE_2);
        createMetricsPublisher.increment(1048576L);
        waitForRuns(store, PROGRAM_ID, 8, 15L);
        scheduler.disableSchedule(SCHEDULE_ID_1);
        scheduler.disableSchedule(SCHEDULE_ID_2);
        waitUntilFinished(runtimeService, PROGRAM_ID, 10L);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        namespaceAdmin.delete(NamespaceId.DEFAULT);
    }

    private void waitUntilFinished(final ProgramRuntimeService programRuntimeService, final ProgramId programId, long j) throws Exception {
        Tasks.waitFor(false, new Callable<Boolean>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(SchedulerTestBase.this.isProgramRunning(programRuntimeService, programId));
            }
        }, j, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    private void waitForRuns(final Store store2, final ProgramId programId, int i, long j) throws Exception {
        Tasks.waitFor(Integer.valueOf(i), new Callable<Integer>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(store2.getRuns(programId, ProgramRunStatus.COMPLETED, 0L, Long.MAX_VALUE, 100).size());
            }
        }, j, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isProgramRunning(ProgramRuntimeService programRuntimeService, final ProgramId programId) {
        return !Iterables.isEmpty(Iterables.filter(programRuntimeService.listAll(ProgramType.values()), new Predicate<ProgramRuntimeService.RuntimeInfo>() { // from class: co.cask.cdap.internal.app.runtime.schedule.SchedulerTestBase.4
            public boolean apply(ProgramRuntimeService.RuntimeInfo runtimeInfo) {
                return runtimeInfo.getProgramId().equals(programId);
            }
        }));
    }
}
