package org.springframework.cloud.dataflow.tasklauncher.sink;

import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.dataflow.tasklauncher.LaunchRequest;
import org.springframework.cloud.dataflow.tasklauncher.TaskLauncherFunction;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.integration.util.DynamicPeriodicTrigger;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/cloud/dataflow/tasklauncher/sink/LaunchRequestConsumer.class */
public class LaunchRequestConsumer implements SmartLifecycle {
    private static final Log log = LogFactory.getLog(LaunchRequestConsumer.class);
    private static final int BACKOFF_MULTIPLE = 2;
    static final String TASK_PLATFORM_NAME = "spring.cloud.dataflow.task.platformName";
    private final PollableMessageSource input;
    private final DynamicPeriodicTrigger trigger;
    private final ConcurrentTaskScheduler taskScheduler;
    private final long initialPeriod;
    private final long maxPeriod;
    private final TaskLauncherFunction taskLauncherFunction;
    private ScheduledFuture<?> scheduledFuture;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicBoolean paused = new AtomicBoolean();
    private volatile boolean autoStart = true;

    public LaunchRequestConsumer(PollableMessageSource pollableMessageSource, DynamicPeriodicTrigger dynamicPeriodicTrigger, long j, TaskLauncherFunction taskLauncherFunction) {
        Assert.notNull(pollableMessageSource, "`input` cannot be null.");
        Assert.notNull(taskLauncherFunction, "`taskLauncherFunction` cannot be null.");
        this.taskLauncherFunction = taskLauncherFunction;
        this.input = pollableMessageSource;
        this.trigger = dynamicPeriodicTrigger;
        this.initialPeriod = dynamicPeriodicTrigger.getDuration().toMillis();
        this.maxPeriod = j;
        this.taskScheduler = new ConcurrentTaskScheduler();
    }

    ScheduledFuture<?> consume() {
        return this.taskScheduler.schedule(() -> {
            if (isRunning()) {
                if (!this.taskLauncherFunction.platformIsAcceptingNewTasks()) {
                    this.paused.set(true);
                    backoff("Polling paused");
                    return;
                }
                if (this.paused.compareAndSet(true, false)) {
                    log.info("Polling resumed");
                }
                if (!this.input.poll(message -> {
                    LaunchRequest launchRequest = (LaunchRequest) message.getPayload();
                    log.debug("Received a Task launch request - task name:  " + launchRequest.getTaskName());
                    this.taskLauncherFunction.apply(launchRequest);
                }, new ParameterizedTypeReference<LaunchRequest>() { // from class: org.springframework.cloud.dataflow.tasklauncher.sink.LaunchRequestConsumer.1
                })) {
                    backoff("No task launch request received");
                } else if (this.trigger.getDuration().toMillis() > this.initialPeriod) {
                    this.trigger.setDuration(Duration.ofMillis(this.initialPeriod));
                    log.info(String.format("Polling period reset to %d ms.", Long.valueOf(this.trigger.getDuration().toMillis())));
                }
            }
        }, this.trigger);
    }

    public boolean isAutoStartup() {
        return this.autoStart;
    }

    public void setAutoStartup(boolean z) {
        this.autoStart = z;
    }

    public synchronized void stop(Runnable runnable) {
        if (runnable != null) {
            runnable.run();
        }
        stop();
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.scheduledFuture = consume();
        }
    }

    public void stop() {
        if (this.running.getAndSet(false)) {
            this.scheduledFuture.cancel(false);
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public boolean isPaused() {
        return this.paused.get();
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    private void backoff(String str) {
        Duration ofMillis;
        synchronized (this.trigger) {
            if (this.trigger.getDuration().compareTo(Duration.ZERO) > 0 && this.trigger.getDuration().compareTo(Duration.ofMillis(this.maxPeriod)) < 0) {
                Duration duration = this.trigger.getDuration();
                if (duration.multipliedBy(2L).compareTo(Duration.ofMillis(this.maxPeriod)) <= 0) {
                    if (duration.getSeconds() == 1) {
                        duration = Duration.ofSeconds(1L);
                    }
                    ofMillis = duration.multipliedBy(2L);
                } else {
                    ofMillis = Duration.ofMillis(this.maxPeriod);
                }
                if (this.trigger.getDuration().toMillis() < 1000) {
                    log.info(String.format(str + " - increasing polling period to %d ms.", Long.valueOf(ofMillis.toMillis())));
                } else {
                    log.info(String.format(str + "- increasing polling period to %d seconds.", Long.valueOf(ofMillis.getSeconds())));
                }
                this.trigger.setDuration(ofMillis);
            } else if (this.trigger.getDuration() == Duration.ofMillis(this.maxPeriod)) {
                log.info(str);
            }
        }
    }
}
