package co.cask.cdap.internal.app.runtime.schedule;

import co.cask.cdap.AppWithStreamSizeSchedule;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricType;
import co.cask.cdap.api.metrics.MetricValue;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.api.schedule.Schedule;
import co.cask.cdap.api.schedule.Schedules;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.app.store.StoreFactory;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.exception.NotFoundException;
import co.cask.cdap.config.PreferencesStore;
import co.cask.cdap.internal.app.namespace.NamespaceAdmin;
import co.cask.cdap.internal.app.namespace.NamespaceCannotBeDeletedException;
import co.cask.cdap.internal.app.runtime.schedule.Scheduler;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.test.XSlowTests;
import co.cask.cdap.test.internal.AppFabricTestHelper;
import com.google.common.collect.ImmutableMap;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({XSlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/StreamSizeSchedulerPollingTest.class */
public class StreamSizeSchedulerPollingTest extends SchedulerTestBase {
    private static StreamSizeScheduler streamSizeScheduler;
    private static Store store;
    private static MetricStore metricStore;
    private static NamespaceAdmin namespaceAdmin;
    private static ProgramRuntimeService runtimeService;
    private static final String SCHEDULE_NAME_1 = "SampleSchedule1";

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Id.Application APP_ID = new Id.Application(Constants.DEFAULT_NAMESPACE_ID, "AppWithStreamSizeSchedule");
    private static final Id.Program PROGRAM_ID = new Id.Program(APP_ID, ProgramType.WORKFLOW, "SampleWorkflow");
    private static final SchedulableProgramType PROGRAM_TYPE = SchedulableProgramType.WORKFLOW;
    private static final Id.Stream STREAM_ID = Id.Stream.from(Constants.DEFAULT_NAMESPACE_ID, "stream");
    private static final String SCHEDULE_NAME_2 = "SampleSchedule2";
    private static final Schedule UPDATE_SCHEDULE_2 = Schedules.createDataSchedule(SCHEDULE_NAME_2, "Every 1M", Schedules.Source.STREAM, STREAM_ID.getName(), 1);

    @BeforeClass
    public static void setup() throws Exception {
        CConfiguration create = CConfiguration.create();
        create.setLong("stream.size.schedule.polling.delay", 1L);
        ((PreferencesStore) AppFabricTestHelper.getInjector(create).getInstance(PreferencesStore.class)).setProperties(Constants.DEFAULT_NAMESPACE_ID.getId(), APP_ID.getId(), ImmutableMap.of("concurrent.runs.enabled", "true"));
        streamSizeScheduler = (StreamSizeScheduler) AppFabricTestHelper.getInjector(create).getInstance(StreamSizeScheduler.class);
        store = ((StoreFactory) AppFabricTestHelper.getInjector(create).getInstance(StoreFactory.class)).create();
        metricStore = (MetricStore) AppFabricTestHelper.getInjector(create).getInstance(MetricStore.class);
        namespaceAdmin = (NamespaceAdmin) AppFabricTestHelper.getInjector(create).getInstance(NamespaceAdmin.class);
        namespaceAdmin.createNamespace(Constants.DEFAULT_NAMESPACE_META);
        runtimeService = (ProgramRuntimeService) AppFabricTestHelper.getInjector(create).getInstance(ProgramRuntimeService.class);
    }

    @Test
    public void testStreamSizeSchedule() throws Exception {
        AppFabricTestHelper.deployApplication(AppWithStreamSizeSchedule.class);
        Assert.assertEquals(Scheduler.ScheduleState.SUSPENDED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1));
        Assert.assertEquals(Scheduler.ScheduleState.SUSPENDED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(Scheduler.ScheduleState.SCHEDULED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1));
        Assert.assertEquals(Scheduler.ScheduleState.SCHEDULED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        Assert.assertEquals(0L, store.getRuns(PROGRAM_ID, ProgramRunStatus.ALL, Long.MIN_VALUE, Long.MAX_VALUE, 100).size());
        metricStore.add(new MetricValue(ImmutableMap.of("ns", STREAM_ID.getNamespaceId(), "str", STREAM_ID.getName()), "collect.bytes", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), 1048576L, MetricType.COUNTER));
        waitForRuns(PROGRAM_ID, 1);
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertEquals(1L, store.getRuns(PROGRAM_ID, ProgramRunStatus.ALL, Long.MIN_VALUE, Long.MAX_VALUE, 100).size());
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(Scheduler.ScheduleState.SUSPENDED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1));
        Assert.assertEquals(Scheduler.ScheduleState.SUSPENDED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        metricStore.add(new MetricValue(ImmutableMap.of("ns", STREAM_ID.getNamespaceId(), "str", STREAM_ID.getName()), "collect.bytes", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), 1048576L, MetricType.COUNTER));
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertEquals(1L, store.getRuns(PROGRAM_ID, ProgramRunStatus.ALL, Long.MIN_VALUE, Long.MAX_VALUE, 100).size());
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        Assert.assertEquals(Scheduler.ScheduleState.SCHEDULED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1));
        waitForRuns(PROGRAM_ID, 2);
        streamSizeScheduler.resumeSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        Assert.assertEquals(Scheduler.ScheduleState.SCHEDULED, streamSizeScheduler.scheduleState(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2));
        waitForRuns(PROGRAM_ID, 3);
        streamSizeScheduler.updateSchedule(PROGRAM_ID, PROGRAM_TYPE, UPDATE_SCHEDULE_2);
        metricStore.add(new MetricValue(ImmutableMap.of("ns", STREAM_ID.getNamespaceId(), "str", STREAM_ID.getName()), "collect.bytes", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), 1048576L, MetricType.COUNTER));
        waitForRuns(PROGRAM_ID, 5);
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_1);
        streamSizeScheduler.suspendSchedule(PROGRAM_ID, PROGRAM_TYPE, SCHEDULE_NAME_2);
        streamSizeScheduler.deleteSchedules(PROGRAM_ID, PROGRAM_TYPE);
        waitUntilFinished(runtimeService, PROGRAM_ID, 10);
    }

    private void waitForRuns(Id.Program program, int i) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() < currentTimeMillis + TimeUnit.SECONDS.toMillis(5L)) {
            try {
                Assert.assertEquals(i, store.getRuns(program, ProgramRunStatus.ALL, Long.MIN_VALUE, Long.MAX_VALUE, 100).size());
                return;
            } catch (Throwable th) {
                TimeUnit.MILLISECONDS.sleep(100L);
            }
        }
        Assert.fail("Time out");
    }

    @AfterClass
    public static void tearDown() throws NotFoundException, NamespaceCannotBeDeletedException {
        namespaceAdmin.deleteNamespace(Constants.DEFAULT_NAMESPACE_ID);
    }
}
