package co.cask.cdap.internal.app.runtime.schedule.store;

import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetServiceModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data2.datafabric.dataset.service.DatasetService;
import co.cask.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutor;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.explore.guice.ExploreClientModule;
import co.cask.cdap.internal.TempFolder;
import co.cask.cdap.internal.app.scheduler.LogPrintingJob;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import co.cask.cdap.store.guice.NamespaceStoreModule;
import co.cask.cdap.test.SlowTests;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionManager;
import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.DirectSchedulerFactory;
import org.quartz.simpl.RAMJobStore;
import org.quartz.simpl.SimpleThreadPool;

@Category({SlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/store/DatasetBasedTimeScheduleStoreTest.class */
public class DatasetBasedTimeScheduleStoreTest {
    private static final TempFolder TEMP_FOLDER = new TempFolder();
    private static Injector injector;
    private static Scheduler scheduler;
    private static TransactionExecutorFactory factory;
    private static DatasetFramework dsFramework;
    private static TransactionManager txService;
    private static DatasetOpExecutor dsOpsService;
    private static DatasetService dsService;
    private static final String DUMMY_SCHEDULER_NAME = "dummyScheduler";

    @BeforeClass
    public static void beforeClass() throws Exception {
        CConfiguration create = CConfiguration.create();
        create.set("local.data.dir", TEMP_FOLDER.newFolder("data").getAbsolutePath());
        injector = Guice.createInjector(new Module[]{new ConfigModule(create), new LocationRuntimeModule().getInMemoryModules(), new DiscoveryRuntimeModule().getInMemoryModules(), new MetricsClientRuntimeModule().getInMemoryModules(), new DataFabricModules().getInMemoryModules(), new DataSetsModules().getStandaloneModules(), new DataSetServiceModules().getInMemoryModules(), new ExploreClientModule(), new NamespaceStoreModule().getInMemoryModules()});
        txService = (TransactionManager) injector.getInstance(TransactionManager.class);
        txService.startAndWait();
        dsOpsService = (DatasetOpExecutor) injector.getInstance(DatasetOpExecutor.class);
        dsOpsService.startAndWait();
        dsService = (DatasetService) injector.getInstance(DatasetService.class);
        dsService.startAndWait();
        dsFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        factory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
    }

    @AfterClass
    public static void afterClass() {
        dsService.stopAndWait();
        dsOpsService.stopAndWait();
        txService.stopAndWait();
    }

    public static void schedulerSetup(boolean z) throws SchedulerException {
        DatasetBasedTimeScheduleStore rAMJobStore;
        if (z) {
            rAMJobStore = new DatasetBasedTimeScheduleStore(factory, new ScheduleStoreTableUtil(dsFramework, (CConfiguration) injector.getInstance(CConfiguration.class)));
        } else {
            rAMJobStore = new RAMJobStore();
        }
        SimpleThreadPool simpleThreadPool = new SimpleThreadPool(10, 5);
        simpleThreadPool.initialize();
        DirectSchedulerFactory.getInstance().createScheduler(DUMMY_SCHEDULER_NAME, "1", simpleThreadPool, rAMJobStore);
        scheduler = DirectSchedulerFactory.getInstance().getScheduler(DUMMY_SCHEDULER_NAME);
        scheduler.start();
    }

    public static void schedulerTearDown() throws SchedulerException {
        scheduler.shutdown();
    }

    @Test
    public void testJobProperties() throws SchedulerException, UnsupportedTypeException, InterruptedException {
        schedulerSetup(true);
        JobDetail jobDetail = getJobDetail("mapreduce1");
        scheduler.scheduleJob(jobDetail, TriggerBuilder.newTrigger().withIdentity("g2").usingJobData(LogPrintingJob.KEY, LogPrintingJob.VALUE).startNow().withSchedule(CronScheduleBuilder.cronSchedule("0/1 * * * * ?")).build());
        TimeUnit.SECONDS.sleep(3L);
        scheduler.deleteJob(jobDetail.getKey());
        schedulerTearDown();
    }

    @Test
    public void testSchedulerWithoutPersistence() throws SchedulerException, UnsupportedTypeException {
        JobKey scheduleJobWithTrigger = scheduleJobWithTrigger(false);
        verifyJobAndTriggers(scheduleJobWithTrigger, 1, Trigger.TriggerState.NORMAL);
        schedulerTearDown();
        schedulerSetup(false);
        Assert.assertNull(scheduler.getJobDetail(scheduleJobWithTrigger));
        schedulerTearDown();
    }

    @Test
    public void testSchedulerWithPersistenceAcrossRestarts() throws SchedulerException, UnsupportedTypeException {
        JobKey scheduleJobWithTrigger = scheduleJobWithTrigger(true);
        verifyJobAndTriggers(scheduleJobWithTrigger, 1, Trigger.TriggerState.NORMAL);
        schedulerTearDown();
        schedulerSetup(true);
        verifyJobAndTriggers(scheduleJobWithTrigger, 1, Trigger.TriggerState.NORMAL);
        schedulerTearDown();
    }

    @Test
    public void testPausedTriggersAcrossRestarts() throws SchedulerException, UnsupportedTypeException {
        JobKey scheduleJobWithTrigger = scheduleJobWithTrigger(true);
        List triggersOfJob = scheduler.getTriggersOfJob(scheduleJobWithTrigger);
        Iterator it = triggersOfJob.iterator();
        while (it.hasNext()) {
            scheduler.pauseTrigger(((Trigger) it.next()).getKey());
        }
        schedulerTearDown();
        schedulerSetup(true);
        verifyJobAndTriggers(scheduleJobWithTrigger, 1, Trigger.TriggerState.PAUSED);
        Assert.assertTrue("Failed to delete the job", scheduler.deleteJob(scheduleJobWithTrigger));
        Assert.assertFalse("Trigger for the deleted job still exists", scheduler.checkExists(((Trigger) triggersOfJob.get(0)).getKey()));
        schedulerTearDown();
        schedulerSetup(true);
        Assert.assertFalse("Trigger for the deleted job still exists", scheduler.checkExists(((Trigger) triggersOfJob.get(0)).getKey()));
        schedulerTearDown();
    }

    @Test
    public void testStoreJobsAndTriggers() throws SchedulerException {
        schedulerSetup(true);
        HashMap hashMap = new HashMap();
        JobDetail jobDetail = getJobDetail("mapreduce1");
        hashMap.put(jobDetail, Sets.newHashSet(new Trigger[]{getTrigger("p1"), getTrigger("p2")}));
        JobDetail jobDetail2 = getJobDetail("mapreduce2");
        hashMap.put(jobDetail2, Sets.newHashSet(new Trigger[]{getTrigger("p3")}));
        scheduler.scheduleJobs(hashMap, true);
        verifyJobAndTriggers(jobDetail.getKey(), 2, Trigger.TriggerState.NORMAL);
        verifyJobAndTriggers(jobDetail2.getKey(), 1, Trigger.TriggerState.NORMAL);
        schedulerTearDown();
        schedulerSetup(true);
        verifyJobAndTriggers(jobDetail.getKey(), 2, Trigger.TriggerState.NORMAL);
        verifyJobAndTriggers(jobDetail2.getKey(), 1, Trigger.TriggerState.NORMAL);
        schedulerTearDown();
    }

    private void verifyJobAndTriggers(JobKey jobKey, int i, Trigger.TriggerState triggerState) throws SchedulerException {
        JobDetail jobDetail = scheduler.getJobDetail(jobKey);
        List<? extends Trigger> triggersOfJob = scheduler.getTriggersOfJob(jobKey);
        Assert.assertEquals(jobDetail.getKey().getName(), jobKey.getName());
        Assert.assertEquals(i, triggersOfJob.size());
        verifyTriggerState(triggersOfJob, triggerState);
    }

    private void verifyTriggerState(List<? extends Trigger> list, Trigger.TriggerState triggerState) throws SchedulerException {
        Iterator<? extends Trigger> it = list.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(triggerState, scheduler.getTriggerState(it.next().getKey()));
        }
    }

    private JobKey scheduleJobWithTrigger(boolean z) throws UnsupportedTypeException, SchedulerException {
        schedulerSetup(z);
        JobDetail jobDetail = getJobDetail("mapreduce1");
        scheduler.scheduleJob(jobDetail, getTrigger("p1"));
        return jobDetail.getKey();
    }

    private Trigger getTrigger(String str) {
        return TriggerBuilder.newTrigger().withIdentity(str).startNow().withSchedule(CronScheduleBuilder.cronSchedule("0 0/5 * * * ?")).build();
    }

    private JobDetail getJobDetail(String str) {
        return JobBuilder.newJob(LogPrintingJob.class).withIdentity(String.format("developer:application1:%s", str)).build();
    }

    @AfterClass
    public static void cleanup() throws SchedulerException, InterruptedException {
        schedulerTearDown();
        Thread.sleep(10000L);
    }
}
