package org.apache.pulsar.io.batchdiscovery;

import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-batch-discovery-triggerers-2.10.0-rc-202202222205.jar:org/apache/pulsar/io/batchdiscovery/CronTriggerer.class */
public class CronTriggerer implements BatchSourceTriggerer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CronTriggerer.class);
    public static final String CRON_KEY = "__CRON__";
    private String cronExpression;
    private ThreadPoolTaskScheduler scheduler;

    @Override // org.apache.pulsar.io.core.BatchSourceTriggerer
    public void init(Map<String, Object> map, SourceContext sourceContext) {
        if (map != null && !map.containsKey(CRON_KEY)) {
            throw new IllegalArgumentException("Cron Trigger is not provided with Cron String");
        }
        this.cronExpression = (String) ((Map) Objects.requireNonNull(map)).get(CRON_KEY);
        this.scheduler = new ThreadPoolTaskScheduler();
        this.scheduler.setThreadNamePrefix(String.format("%s/%s/%s-cron-triggerer-", sourceContext.getTenant(), sourceContext.getNamespace(), sourceContext.getSourceName()));
        log.info("Initialized CronTrigger with expression: {}", this.cronExpression);
    }

    @Override // org.apache.pulsar.io.core.BatchSourceTriggerer
    public void start(Consumer<String> consumer) {
        this.scheduler.initialize();
        this.scheduler.schedule(() -> {
            consumer.accept("CRON");
        }, new CronTrigger(this.cronExpression));
    }

    @Override // org.apache.pulsar.io.core.BatchSourceTriggerer
    public void stop() {
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
    }
}
