package co.cask.cdap.scheduler;

import co.cask.cdap.AppWithFrequentScheduledWorkflows;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.AlreadyExistsException;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.ConflictException;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.internal.app.runtime.schedule.ProgramSchedule;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleStatus;
import co.cask.cdap.internal.app.runtime.schedule.trigger.PartitionTrigger;
import co.cask.cdap.internal.app.runtime.schedule.trigger.TimeTrigger;
import co.cask.cdap.internal.app.services.http.AppFabricTestBase;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ScheduleId;
import co.cask.cdap.proto.id.TopicId;
import co.cask.cdap.proto.id.WorkflowId;
import co.cask.cdap.test.XSlowTests;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.tephra.TransactionFailureException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerServiceTest.class */
public class CoreSchedulerServiceTest extends AppFabricTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(CoreSchedulerServiceTest.class);
    private static final NamespaceId NS_ID = new NamespaceId("schedtest");
    private static final ApplicationId APP1_ID = NS_ID.app("app1");
    private static final ApplicationId APP2_ID = NS_ID.app("app2");
    private static final WorkflowId PROG1_ID = APP1_ID.workflow("wf1");
    private static final WorkflowId PROG2_ID = APP2_ID.workflow("wf2");
    private static final WorkflowId PROG11_ID = APP1_ID.workflow("wf11");
    private static final ScheduleId PSCHED1_ID = APP1_ID.schedule("psched1");
    private static final ScheduleId PSCHED2_ID = APP2_ID.schedule("psched2");
    private static final ScheduleId TSCHED1_ID = APP1_ID.schedule("tsched1");
    private static final ScheduleId TSCHED11_ID = APP1_ID.schedule("tsched11");
    private static final DatasetId DS1_ID = NS_ID.dataset("pfs1");
    private static final DatasetId DS2_ID = NS_ID.dataset("pfs2");
    private static final ApplicationId APP_ID = NamespaceId.DEFAULT.app(AppWithFrequentScheduledWorkflows.NAME);
    private static final ProgramId WORKFLOW_1 = APP_ID.program(ProgramType.WORKFLOW, "SomeWorkflow");
    private static final ProgramId WORKFLOW_2 = APP_ID.program(ProgramType.WORKFLOW, "AnotherWorkflow");
    private static final ProgramId SCHEDULED_WORKFLOW_1 = APP_ID.program(ProgramType.WORKFLOW, AppWithFrequentScheduledWorkflows.SCHEDULED_WORKFLOW_1);
    private static final ProgramId SCHEDULED_WORKFLOW_2 = APP_ID.program(ProgramType.WORKFLOW, AppWithFrequentScheduledWorkflows.SCHEDULED_WORKFLOW_2);

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Gson GSON = new Gson();
    private static MessagingService messagingService;
    private static Store store;
    private static TopicId dataEventTopic;
    private static Scheduler scheduler;

    @BeforeClass
    public static void beforeClass() throws Throwable {
        AppFabricTestBase.beforeClass();
        scheduler = (Scheduler) getInjector().getInstance(Scheduler.class);
        if (scheduler instanceof Service) {
            scheduler.startAndWait();
        }
    }

    @AfterClass
    public static void afterClass() throws Exception {
        AppFabricTestBase.afterClass();
        if (scheduler instanceof Service) {
            scheduler.stopAndWait();
        }
    }

    @Test
    public void addListDeleteSchedules() throws Exception {
        Assert.assertTrue(scheduler.listSchedules(APP1_ID).isEmpty());
        Assert.assertTrue(scheduler.listSchedules(PROG1_ID).isEmpty());
        ProgramSchedule programSchedule = new ProgramSchedule("tsched1", "one time schedule", PROG1_ID, ImmutableMap.of("prop1", "nn"), new TimeTrigger("* * ? * 1"), ImmutableList.of());
        scheduler.addSchedule(programSchedule);
        Assert.assertEquals(programSchedule, scheduler.getSchedule(TSCHED1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule), scheduler.listSchedules(PROG1_ID));
        ProgramSchedule programSchedule2 = new ProgramSchedule("psched1", "one partition schedule", PROG1_ID, ImmutableMap.of("prop3", "abc"), new PartitionTrigger(DS1_ID, 1), ImmutableList.of());
        ProgramSchedule programSchedule3 = new ProgramSchedule("tsched11", "two times schedule", PROG11_ID, ImmutableMap.of("prop2", "xx"), new TimeTrigger("* * ? * 1,2"), ImmutableList.of());
        ProgramSchedule programSchedule4 = new ProgramSchedule("psched2", "two partition schedule", PROG2_ID, ImmutableMap.of("propper", "popper"), new PartitionTrigger(DS2_ID, 2), ImmutableList.of());
        scheduler.addSchedules(ImmutableList.of(programSchedule2, programSchedule3, programSchedule4));
        Assert.assertEquals(programSchedule2, scheduler.getSchedule(PSCHED1_ID));
        Assert.assertEquals(programSchedule3, scheduler.getSchedule(TSCHED11_ID));
        Assert.assertEquals(programSchedule4, scheduler.getSchedule(PSCHED2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        scheduler.deleteSchedule(TSCHED1_ID);
        verifyNotFound(scheduler, TSCHED1_ID);
        Assert.assertEquals(ImmutableList.of(programSchedule2), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        try {
            scheduler.deleteSchedules(ImmutableList.of(TSCHED1_ID, TSCHED11_ID));
            Assert.fail("expected NotFoundException");
        } catch (NotFoundException e) {
        }
        Assert.assertEquals(ImmutableList.of(programSchedule2), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        try {
            scheduler.addSchedules(ImmutableList.of(programSchedule, programSchedule3));
            Assert.fail("expected AlreadyExistsException");
        } catch (AlreadyExistsException e2) {
        }
        Assert.assertEquals(ImmutableList.of(programSchedule2), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        scheduler.addSchedule(programSchedule);
        scheduler.deleteSchedules(APP1_ID);
        verifyNotFound(scheduler, TSCHED1_ID);
        verifyNotFound(scheduler, PSCHED1_ID);
        verifyNotFound(scheduler, TSCHED11_ID);
        Assert.assertEquals(ImmutableList.of(), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
    }

    private static void verifyNotFound(Scheduler scheduler2, ScheduleId scheduleId) {
        try {
            scheduler2.getSchedule(scheduleId);
            Assert.fail("expected NotFoundException");
        } catch (NotFoundException e) {
        }
    }

    @Test
    @Category({XSlowTests.class})
    public void testRunScheduledJobs() throws Exception {
        messagingService = (MessagingService) getInjector().getInstance(MessagingService.class);
        dataEventTopic = NamespaceId.SYSTEM.topic(((CConfiguration) getInjector().getInstance(CConfiguration.class)).get("data.event.topic"));
        store = (Store) getInjector().getInstance(Store.class);
        deploy(AppWithFrequentScheduledWorkflows.class);
        enableSchedule(AppWithFrequentScheduledWorkflows.ONE_MIN_SCHEDULE_1);
        enableSchedule(AppWithFrequentScheduledWorkflows.ONE_MIN_SCHEDULE_2);
        enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_1);
        enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 5; i++) {
            testNewPartition(i + 1);
        }
        int runs = getRuns(WORKFLOW_1);
        int runs2 = getRuns(WORKFLOW_2);
        disableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_1);
        disableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        publishNotification(dataEventTopic, WORKFLOW_1, AppWithFrequentScheduledWorkflows.DATASET_NAME1);
        publishNotification(dataEventTopic, WORKFLOW_2, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        publishNotification(dataEventTopic, WORKFLOW_2, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (TimeUnit.SECONDS.toMillis(75L) - currentTimeMillis2 > 0) {
            Thread.sleep(TimeUnit.SECONDS.toMillis(75L) - currentTimeMillis2);
        }
        Assert.assertTrue(0 < getRuns(SCHEDULED_WORKFLOW_1));
        Assert.assertTrue(0 < getRuns(SCHEDULED_WORKFLOW_2));
        Assert.assertEquals(runs, getRuns(WORKFLOW_1));
        Assert.assertEquals(runs2, getRuns(WORKFLOW_2));
        enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        testScheduleUpdate("disable");
        testScheduleUpdate("update");
        testScheduleUpdate("delete");
    }

    private void testScheduleUpdate(String str) throws Exception {
        int runs = getRuns(WORKFLOW_2);
        ScheduleId schedule = APP_ID.schedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        publishNotification(dataEventTopic, WORKFLOW_2, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertEquals(runs, getRuns(WORKFLOW_2));
        if ("disable".equals(str)) {
            disableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
            enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        } else {
            ProgramSchedule schedule2 = scheduler.getSchedule(schedule);
            ProgramSchedule programSchedule = new ProgramSchedule(schedule2.getName(), schedule2.getDescription(), schedule2.getProgramId(), ImmutableMap.builder().putAll(schedule2.getProperties()).put(str, str).build(), schedule2.getTrigger(), schedule2.getConstraints());
            if ("update".equals(str)) {
                scheduler.updateSchedule(programSchedule);
                Assert.assertEquals(ProgramScheduleStatus.SCHEDULED, scheduler.getScheduleStatus(schedule));
            } else if ("delete".equals(str)) {
                scheduler.deleteSchedule(schedule);
                scheduler.addSchedule(programSchedule);
                enableSchedule(schedule.getSchedule());
            } else {
                Assert.fail("invalid howToUpdate: " + str);
            }
        }
        publishNotification(dataEventTopic, WORKFLOW_2, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        TimeUnit.SECONDS.sleep(10L);
        Assert.assertEquals(runs, getRuns(WORKFLOW_2));
        publishNotification(dataEventTopic, WORKFLOW_2, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        waitForCompleteRuns(runs + 1, WORKFLOW_2);
    }

    private void enableSchedule(String str) throws NotFoundException, ConflictException {
        ScheduleId schedule = APP_ID.schedule(str);
        scheduler.enableSchedule(schedule);
        Assert.assertEquals(ProgramScheduleStatus.SCHEDULED, scheduler.getScheduleStatus(schedule));
    }

    private void disableSchedule(String str) throws NotFoundException, ConflictException {
        ScheduleId schedule = APP_ID.schedule(str);
        scheduler.disableSchedule(schedule);
        Assert.assertEquals(ProgramScheduleStatus.SUSPENDED, scheduler.getScheduleStatus(schedule));
    }

    private void testNewPartition(int i) throws Exception {
        publishNotification(dataEventTopic, WORKFLOW_1, AppWithFrequentScheduledWorkflows.DATASET_NAME1);
        publishNotification(dataEventTopic, WORKFLOW_2, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        publishNotification(dataEventTopic, WORKFLOW_2, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        try {
            waitForCompleteRuns(i, WORKFLOW_1);
            waitForCompleteRuns(i, WORKFLOW_2);
            LOG.info("WORKFLOW_1 runRecords: {}", Integer.valueOf(getRuns(WORKFLOW_1)));
            LOG.info("WORKFLOW_2 runRecords: {}", Integer.valueOf(getRuns(WORKFLOW_2)));
        } catch (Throwable th) {
            LOG.info("WORKFLOW_1 runRecords: {}", Integer.valueOf(getRuns(WORKFLOW_1)));
            LOG.info("WORKFLOW_2 runRecords: {}", Integer.valueOf(getRuns(WORKFLOW_2)));
            throw th;
        }
    }

    private void waitForCompleteRuns(int i, final ProgramId programId) throws InterruptedException, ExecutionException, TimeoutException {
        Tasks.waitFor(Integer.valueOf(i), new Callable<Integer>() { // from class: co.cask.cdap.scheduler.CoreSchedulerServiceTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return Integer.valueOf(CoreSchedulerServiceTest.this.getRuns(programId));
            }
        }, 10L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getRuns(ProgramId programId) {
        return store.getRuns(programId, ProgramRunStatus.ALL, 0L, Long.MAX_VALUE, Integer.MAX_VALUE).size();
    }

    private void publishNotification(TopicId topicId, ProgramId programId, String str) throws TopicNotFoundException, IOException, TransactionFailureException, AlreadyExistsException, BadRequestException {
        messagingService.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{GSON.toJson(Notification.forPartitions(programId.getNamespaceId().dataset(str), ImmutableList.of(PartitionKey.builder().addIntField("part1", 1).build())))}).build());
    }
}
