package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.spark.Spark;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.CombineClassLoader;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.twill.HadoopClassExcluder;
import co.cask.cdap.common.twill.LocalLocationFactory;
import co.cask.cdap.common.utils.DirUtils;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.internal.app.runtime.LocalizationUtils;
import co.cask.cdap.internal.app.runtime.distributed.LocalizeResource;
import co.cask.cdap.internal.app.runtime.spark.metrics.SparkMetricsSink;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.gson.Gson;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.ClassAcceptor;
import org.apache.twill.api.RunId;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.ApplicationBundler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkRuntimeService.class */
public final class SparkRuntimeService extends AbstractExecutionThreadService {
    private static final String CDAP_SPARK_JAR = "cdap-spark.jar";
    private static final Logger LOG = LoggerFactory.getLogger(SparkRuntimeService.class);
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final Spark spark;
    private final Location programJarLocation;
    private final SparkContextFactory sparkContextFactory;
    private final SparkSubmitter sparkSubmitter;
    private final TransactionSystemClient txClient;
    private final AtomicReference<ExecutionFuture<RunId>> submissionFuture = new AtomicReference<>();
    private Callable<ExecutionFuture<RunId>> submitSpark;
    private Runnable cleanupTask;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkRuntimeService(CConfiguration cConfiguration, Configuration configuration, Spark spark, SparkContextFactory sparkContextFactory, SparkSubmitter sparkSubmitter, Location location, TransactionSystemClient transactionSystemClient) {
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.spark = spark;
        this.programJarLocation = location;
        this.sparkContextFactory = sparkContextFactory;
        this.sparkSubmitter = sparkSubmitter;
        this.txClient = transactionSystemClient;
    }

    protected String getServiceName() {
        return "Spark - " + this.sparkContextFactory.getClientContext().getSpecification().getName();
    }

    protected void startUp() throws Exception {
        String name;
        beforeSubmit();
        File createTempDir = DirUtils.createTempDir(new File(this.cConf.get("local.data.dir"), this.cConf.get("app.temp.dir")).getAbsoluteFile());
        createTempDir.mkdirs();
        this.cleanupTask = createCleanupTask(createTempDir);
        try {
            SparkContextConfig sparkContextConfig = new SparkContextConfig(this.hConf);
            ClientSparkContext clientContext = this.sparkContextFactory.getClientContext();
            final File generateJobJar = generateJobJar(createTempDir);
            final ArrayList arrayList = new ArrayList();
            File pluginArchive = clientContext.getPluginArchive();
            String str = null;
            if (sparkContextConfig.isLocal()) {
                name = SparkMetricsSink.writeConfig(File.createTempFile("metrics", ".properties", createTempDir)).getAbsolutePath();
            } else {
                arrayList.add(new LocalizeResource(copyProgramJar(this.programJarLocation, createTempDir), true));
                arrayList.add(new LocalizeResource(buildDependencyJar(createTempDir), true));
                arrayList.add(new LocalizeResource(saveCConf(this.cConf, createTempDir)));
                if (pluginArchive != null) {
                    arrayList.add(new LocalizeResource(pluginArchive, true));
                }
                File createLogbackJar = createLogbackJar(createTempDir);
                if (createLogbackJar != null) {
                    arrayList.add(new LocalizeResource(createLogbackJar));
                    str = createLogbackJar.getName();
                }
                File writeConfig = SparkMetricsSink.writeConfig(File.createTempFile("metrics", ".properties", new File(System.getProperty("user.dir"))));
                name = writeConfig.getName();
                arrayList.add(new LocalizeResource(writeConfig));
            }
            Transaction startLong = this.txClient.startLong();
            Map<String, File> localizeUserResources = localizeUserResources(sparkContextConfig, clientContext, arrayList, createTempDir);
            final ExecutionSparkContext createExecutionContext = this.sparkContextFactory.createExecutionContext(startLong, localizeUserResources);
            if (!sparkContextConfig.isLocal()) {
                Configuration configuration = sparkContextConfig.set(createExecutionContext).getConfiguration();
                if (pluginArchive != null) {
                    configuration = new Configuration(configuration);
                    configuration.set("cdap.program.plugin.archive", pluginArchive.getName());
                }
                configuration.set("cdap.spark.local.resources", new Gson().toJson(localizeUserResources.keySet()));
                arrayList.add(new LocalizeResource(saveHConf(configuration, createTempDir)));
            }
            final Map<String, String> createSubmitConfigs = createSubmitConfigs(clientContext, createTempDir, name, str);
            this.submitSpark = new Callable<ExecutionFuture<RunId>>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public ExecutionFuture<RunId> call() throws Exception {
                    return SparkRuntimeService.this.sparkSubmitter.submit(createExecutionContext, createSubmitConfigs, arrayList, generateJobJar, createExecutionContext.getRunId());
                }
            };
            this.submissionFuture.set(new SettableExecutionFuture(createExecutionContext));
        } catch (Throwable th) {
            this.cleanupTask.run();
            throw th;
        }
    }

    protected void run() throws Exception {
        if (isRunning() && this.submissionFuture.getAndSet(this.submitSpark.call()).isCancelled()) {
            this.submissionFuture.get().cancel(true);
        }
        ExecutionFuture<RunId> executionFuture = this.submissionFuture.get();
        ExecutionSparkContext sparkContext = executionFuture.getSparkContext();
        Transaction transaction = sparkContext.getTransaction();
        try {
            try {
                executionFuture.get();
                try {
                    sparkContext.flushDatasets();
                    LOG.debug("Committing Spark Program transaction: {}", sparkContext);
                } catch (Exception e) {
                    LOG.error("Failed to commit datasets", e);
                    this.txClient.invalidate(transaction.getWritePointer());
                }
                if (!this.txClient.commit(transaction)) {
                    LOG.warn("Spark Job transaction failed to commit");
                    throw new TransactionFailureException("Failed to commit transaction for Spark " + sparkContext);
                }
                LOG.debug("Spark Program transaction committed: {}", sparkContext);
                Closeables.closeQuietly(sparkContext);
            } catch (Exception e2) {
                this.txClient.invalidate(transaction.getWritePointer());
                if (!executionFuture.isCancelled()) {
                    LOG.error("Spark program execution failure: {}", this.sparkContextFactory.getClientContext(), e2);
                    throw e2;
                }
                LOG.debug("Spark program execution cancelled: {}", this.sparkContextFactory.getClientContext());
                Closeables.closeQuietly(sparkContext);
            }
        } catch (Throwable th) {
            Closeables.closeQuietly(sparkContext);
            throw th;
        }
    }

    protected void shutDown() throws Exception {
        boolean z = true;
        try {
            try {
                this.submissionFuture.get().get();
            } catch (Throwable th) {
                this.cleanupTask.run();
                LOG.debug("Spark program completed: {}", this.sparkContextFactory.getClientContext());
                throw th;
            }
        } catch (Exception e) {
            z = false;
        }
        onFinish(z);
        this.cleanupTask.run();
        LOG.debug("Spark program completed: {}", this.sparkContextFactory.getClientContext());
    }

    protected void triggerShutdown() {
        LOG.debug("Stop requested for Spark Program {}", this.submissionFuture.get().getSparkContext());
        this.submissionFuture.get().cancel(true);
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.2
            @Override // java.util.concurrent.Executor
            public void execute(final Runnable runnable) {
                Thread thread = new Thread(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        LoggingContextAccessor.setLoggingContext(SparkRuntimeService.this.sparkContextFactory.getClientContext().getLoggingContext());
                        runnable.run();
                    }
                });
                thread.setDaemon(true);
                thread.setName(SparkRuntimeService.this.getServiceName());
                thread.start();
            }
        };
    }

    private void beforeSubmit() throws TransactionFailureException, InterruptedException {
        Transactions.execute(this.sparkContextFactory.getClientContext().getTransactionContext(), this.spark.getClass().getName() + ".beforeSubmit()", new Callable<Void>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(new CombineClassLoader((ClassLoader) null, ImmutableList.of(SparkRuntimeService.this.spark.getClass().getClassLoader(), getClass().getClassLoader())));
                try {
                    SparkRuntimeService.this.spark.beforeSubmit(SparkRuntimeService.this.sparkContextFactory.getClientContext());
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    return null;
                } catch (Throwable th) {
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    throw th;
                }
            }
        });
    }

    private void onFinish(final boolean z) throws TransactionFailureException, InterruptedException {
        Transactions.execute(this.sparkContextFactory.getClientContext().getTransactionContext(), this.spark.getClass().getName() + ".onFinish()", new Callable<Void>() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(new CombineClassLoader((ClassLoader) null, ImmutableList.of(SparkRuntimeService.this.spark.getClass().getClassLoader(), getClass().getClassLoader())));
                try {
                    SparkRuntimeService.this.spark.onFinish(z, SparkRuntimeService.this.sparkContextFactory.getClientContext());
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    return null;
                } catch (Throwable th) {
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    throw th;
                }
            }
        });
    }

    private Map<String, String> createSubmitConfigs(ClientSparkContext clientSparkContext, File file, String str, @Nullable String str2) {
        String str3;
        HashMap hashMap = new HashMap();
        for (Tuple2 tuple2 : clientSparkContext.getSparkConf().getAll()) {
            hashMap.put(tuple2._1(), tuple2._2());
        }
        str3 = "$PWD/cdap-spark.jar/lib/*";
        hashMap.put("spark.executor.extraClassPath", str2 != null ? str2 + File.pathSeparator + str3 : "$PWD/cdap-spark.jar/lib/*");
        hashMap.put("spark.metrics.conf", str);
        hashMap.put("spark.local.dir", file.getAbsolutePath());
        hashMap.put("spark.executor.memory", clientSparkContext.getExecutorResources().getMemoryMB() + "m");
        hashMap.put("spark.executor.cores", String.valueOf(clientSparkContext.getExecutorResources().getVirtualCores()));
        return hashMap;
    }

    private File buildDependencyJar(File file) throws IOException {
        Location create = new LocalLocationFactory(file).create(CDAP_SPARK_JAR);
        final HadoopClassExcluder hadoopClassExcluder = new HadoopClassExcluder();
        new ApplicationBundler(new ClassAcceptor() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.5
            public boolean accept(String str, URL url, URL url2) {
                if (str.startsWith("org.apache.spark") || str.startsWith("scala") || url2.toString().contains("spark-assembly")) {
                    return false;
                }
                return hadoopClassExcluder.accept(str, url, url2);
            }
        }).createBundle(create, SparkProgramWrapper.class, new Class[]{HBaseTableUtilFactory.getHBaseTableUtilClass()});
        return new File(Locations.toURI(create));
    }

    private File copyProgramJar(Location location, File file) throws IOException {
        File file2 = new File(file, "program.jar");
        LOG.debug("Copy program jar from {} to {}", location, file2);
        Files.copy(Locations.newInputSupplier(location), file2);
        return file2;
    }

    private File generateJobJar(File file) throws IOException {
        File file2 = new File(file, "emptyJob.jar");
        new JarOutputStream(new FileOutputStream(file2)).close();
        return file2;
    }

    private File saveCConf(CConfiguration cConfiguration, File file) throws IOException {
        File file2 = new File(file, "cConf.xml");
        BufferedWriter newWriter = Files.newWriter(file2, Charsets.UTF_8);
        Throwable th = null;
        try {
            try {
                cConfiguration.writeXml(newWriter);
                if (newWriter != null) {
                    if (0 != 0) {
                        try {
                            newWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newWriter.close();
                    }
                }
                return file2;
            } finally {
            }
        } catch (Throwable th3) {
            if (newWriter != null) {
                if (th != null) {
                    try {
                        newWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newWriter.close();
                }
            }
            throw th3;
        }
    }

    @Nullable
    private File createLogbackJar(File file) throws IOException {
        InputStream resourceAsStream = ((ClassLoader) Objects.firstNonNull(Thread.currentThread().getContextClassLoader(), getClass().getClassLoader())).getResourceAsStream("logback.xml");
        Throwable th = null;
        try {
            if (resourceAsStream == null) {
                LOG.warn("Could not find logback.xml for Spark!");
                if (resourceAsStream == null) {
                    return null;
                }
                if (0 == 0) {
                    resourceAsStream.close();
                    return null;
                }
                try {
                    resourceAsStream.close();
                    return null;
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                    return null;
                }
            }
            File createTempFile = File.createTempFile("logback.xml", ".jar", file);
            JarOutputStream jarOutputStream = new JarOutputStream(new FileOutputStream(createTempFile));
            Throwable th3 = null;
            try {
                try {
                    jarOutputStream.putNextEntry(new JarEntry("logback.xml"));
                    ByteStreams.copy(resourceAsStream, jarOutputStream);
                    if (jarOutputStream != null) {
                        if (0 != 0) {
                            try {
                                jarOutputStream.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            jarOutputStream.close();
                        }
                    }
                    return createTempFile;
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (jarOutputStream != null) {
                    if (th3 != null) {
                        try {
                            jarOutputStream.close();
                        } catch (Throwable th7) {
                            th3.addSuppressed(th7);
                        }
                    } else {
                        jarOutputStream.close();
                    }
                }
                throw th6;
            }
        } finally {
            if (resourceAsStream != null) {
                if (0 != 0) {
                    try {
                        resourceAsStream.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    resourceAsStream.close();
                }
            }
        }
    }

    private File saveHConf(Configuration configuration, File file) throws IOException {
        File file2 = new File(file, "hConf.xml");
        BufferedWriter newWriter = Files.newWriter(file2, Charsets.UTF_8);
        Throwable th = null;
        try {
            try {
                configuration.writeXml(newWriter);
                if (newWriter != null) {
                    if (0 != 0) {
                        try {
                            newWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newWriter.close();
                    }
                }
                return file2;
            } finally {
            }
        } catch (Throwable th3) {
            if (newWriter != null) {
                if (th != null) {
                    try {
                        newWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newWriter.close();
                }
            }
            throw th3;
        }
    }

    private Map<String, File> localizeUserResources(SparkContextConfig sparkContextConfig, ClientSparkContext clientSparkContext, List<LocalizeResource> list, File file) throws IOException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, LocalizeResource> entry : clientSparkContext.getResourcesToLocalize().entrySet()) {
            File localizeResource = LocalizationUtils.localizeResource(entry.getKey(), entry.getValue(), file);
            if (!sparkContextConfig.isLocal()) {
                try {
                    URI uri = localizeResource.toURI();
                    list.add(new LocalizeResource(new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), entry.getKey()), entry.getValue().isArchive()));
                } catch (URISyntaxException e) {
                    throw Throwables.propagate(e);
                }
            }
            hashMap.put(entry.getKey(), localizeResource);
        }
        return hashMap;
    }

    private Runnable createCleanupTask(final File file) {
        return new Runnable() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.6
            @Override // java.lang.Runnable
            public void run() {
                for (String str : Iterables.filter(System.getProperties().stringPropertyNames(), Predicates.containsPattern("^spark\\."))) {
                    SparkRuntimeService.LOG.debug("Removing Spark system property: {}", str);
                    System.clearProperty(str);
                }
                try {
                    DirUtils.deleteDirectoryContents(file);
                } catch (IOException e) {
                    SparkRuntimeService.LOG.warn("Failed to cleanup directory {}", file);
                }
            }
        };
    }
}
