package co.cask.cdap.app.runtime.spark;

import co.cask.cdap.app.runtime.spark.distributed.SparkExecutionClient;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.internal.app.runtime.workflow.BasicWorkflowToken;
import co.cask.cdap.proto.id.ProgramRunId;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.collection.mutable.StringBuilder;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkMainWrapper.scala */
/* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkMainWrapper$$anonfun$startHeartbeat$2.class */
public class SparkMainWrapper$$anonfun$startHeartbeat$2 extends AbstractFunction1<String, Object> implements Serializable {
    public static final long serialVersionUID = 0;
    private final SparkRuntimeContext runtimeContext$2;
    public final Function0 stopFunc$1;

    public final Cancellable apply(String str) {
        final ProgramRunId run = this.runtimeContext$2.getProgram().getId().toEntityId().run(this.runtimeContext$2.getRunId().getId());
        final SparkExecutionClient sparkExecutionClient = new SparkExecutionClient(URI.create(str), run);
        final BasicWorkflowToken basicWorkflowToken = (BasicWorkflowToken) Option$.MODULE$.apply(this.runtimeContext$2.getWorkflowInfo()).map(new SparkMainWrapper$$anonfun$startHeartbeat$2$$anonfun$3(this)).orNull(Predef$.MODULE$.conforms());
        final ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory(new StringBuilder().append("heartbeat-").append(run.getRun()).toString()));
        final Option<SparkCredentialsUpdater> co$cask$cdap$app$runtime$spark$SparkMainWrapper$$createCredentialsUpdater = SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$createCredentialsUpdater(this.runtimeContext$2.getConfiguration(), sparkExecutionClient);
        SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$heartbeat(sparkExecutionClient, this.stopFunc$1, SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$heartbeat$default$3());
        if (SparkRuntimeEnv$.MODULE$.isStopped()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            co$cask$cdap$app$runtime$spark$SparkMainWrapper$$createCredentialsUpdater.foreach(new SparkMainWrapper$$anonfun$startHeartbeat$2$$anonfun$apply$1(this));
            newSingleThreadScheduledExecutor.schedule(new Runnable(this, sparkExecutionClient, basicWorkflowToken, newSingleThreadScheduledExecutor) { // from class: co.cask.cdap.app.runtime.spark.SparkMainWrapper$$anonfun$startHeartbeat$2$$anon$2
                private final AtomicInteger failureCount;
                private final /* synthetic */ SparkMainWrapper$$anonfun$startHeartbeat$2 $outer;
                private final SparkExecutionClient client$1;
                private final BasicWorkflowToken workflowToken$1;
                private final ScheduledExecutorService executor$1;

                public AtomicInteger failureCount() {
                    return this.failureCount;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$heartbeat(this.client$1, this.$outer.stopFunc$1, this.workflowToken$1);
                        failureCount().set(0);
                        if (SparkRuntimeEnv$.MODULE$.isStopped()) {
                            return;
                        }
                        this.executor$1.schedule(this, SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$HEARTBEAT_INTERVAL_SECONDS(), TimeUnit.SECONDS);
                    } catch (BadRequestException e) {
                        SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$LOG().error("Invalid spark program heartbeat. Terminating the execution.", e);
                        this.$outer.stopFunc$1.apply$mcV$sp();
                    } catch (Throwable th) {
                        if (failureCount().getAndIncrement() < 10) {
                            SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$LOG().warn("Failed to make heartbeat for {} times", BoxesRunTime.boxToInteger(failureCount().get()), th);
                        } else {
                            SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$LOG().error("Failed to make heartbeat for {} times. Terminating the execution", BoxesRunTime.boxToInteger(failureCount().get()));
                            this.$outer.stopFunc$1.apply$mcV$sp();
                        }
                    }
                }

                {
                    if (this == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = this;
                    this.client$1 = sparkExecutionClient;
                    this.workflowToken$1 = basicWorkflowToken;
                    this.executor$1 = newSingleThreadScheduledExecutor;
                    this.failureCount = new AtomicInteger();
                }
            }, SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$HEARTBEAT_INTERVAL_SECONDS(), TimeUnit.SECONDS);
        }
        return new Cancellable(this, run, sparkExecutionClient, basicWorkflowToken, newSingleThreadScheduledExecutor, co$cask$cdap$app$runtime$spark$SparkMainWrapper$$createCredentialsUpdater) { // from class: co.cask.cdap.app.runtime.spark.SparkMainWrapper$$anonfun$startHeartbeat$2$$anon$3
            private final ProgramRunId programRunId$1;
            private final SparkExecutionClient client$1;
            private final BasicWorkflowToken workflowToken$1;
            private final ScheduledExecutorService executor$1;
            private final Option credentialsUpdater$1;

            public void cancel() {
                this.credentialsUpdater$1.foreach(new SparkMainWrapper$$anonfun$startHeartbeat$2$$anon$3$$anonfun$cancel$1(this));
                this.executor$1.shutdownNow();
                this.executor$1.awaitTermination(5L, TimeUnit.SECONDS);
                this.client$1.completed(this.workflowToken$1);
                SparkMainWrapper$.MODULE$.co$cask$cdap$app$runtime$spark$SparkMainWrapper$$LOG().info("Spark program execution completed: {}", new Object[]{this.programRunId$1});
            }

            {
                this.programRunId$1 = run;
                this.client$1 = sparkExecutionClient;
                this.workflowToken$1 = basicWorkflowToken;
                this.executor$1 = newSingleThreadScheduledExecutor;
                this.credentialsUpdater$1 = co$cask$cdap$app$runtime$spark$SparkMainWrapper$$createCredentialsUpdater;
            }
        };
    }

    public SparkMainWrapper$$anonfun$startHeartbeat$2(SparkRuntimeContext sparkRuntimeContext, Function0 function0) {
        this.runtimeContext$2 = sparkRuntimeContext;
        this.stopFunc$1 = function0;
    }
}
