package co.cask.cdap.app.runtime.spark;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.metadata.MetadataReader;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.security.store.SecureStore;
import co.cask.cdap.api.security.store.SecureStoreManager;
import co.cask.cdap.api.spark.dynamic.SparkInterpreter;
import co.cask.cdap.app.guice.ClusterMode;
import co.cask.cdap.app.guice.DistributedArtifactManagerModule;
import co.cask.cdap.app.guice.DistributedProgramContainerModule;
import co.cask.cdap.app.guice.UnsupportedPluginFinder;
import co.cask.cdap.app.program.DefaultProgram;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.FilterClassLoader;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.service.ServiceDiscoverable;
import co.cask.cdap.data.ProgramContextAware;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.writer.MetadataPublisher;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.runtime.BasicProgramContext;
import co.cask.cdap.internal.app.runtime.ProgramClassLoader;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.artifact.PluginFinder;
import co.cask.cdap.internal.app.runtime.plugin.PluginInstantiator;
import co.cask.cdap.internal.app.runtime.workflow.NameMappedDatasetFramework;
import co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramInfo;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.security.spi.authentication.AuthenticationContext;
import co.cask.cdap.security.spi.authorization.AuthorizationEnforcer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.api.ServiceAnnouncer;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkRuntimeContextProvider.class */
public final class SparkRuntimeContextProvider {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRuntimeContextProvider.class);
    static final String CCONF_FILE_NAME = "cConf.xml";
    static final String HCONF_FILE_NAME = "hConf.xml";
    static final String PROGRAM_JAR_EXPANDED_NAME = "program.expanded.jar";
    static final String PROGRAM_JAR_NAME = "program.jar";
    static final String EXECUTOR_CLASSLOADER_NAME = "org.apache.spark.repl.ExecutorClassLoader";
    private static volatile SparkRuntimeContext sparkRuntimeContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkRuntimeContextProvider$LogAppenderService.class */
    public static final class LogAppenderService extends AbstractService {
        private final ProgramOptions programOptions;
        private final LogAppenderInitializer initializer;

        private LogAppenderService(LogAppenderInitializer logAppenderInitializer, ProgramOptions programOptions) {
            this.initializer = logAppenderInitializer;
            this.programOptions = programOptions;
        }

        protected void doStart() {
            try {
                this.initializer.initialize();
                SystemArguments.setLogLevel(this.programOptions.getUserArguments(), this.initializer);
                notifyStarted();
            } catch (Throwable th) {
                notifyFailed(th);
            }
        }

        protected void doStop() {
            try {
                this.initializer.close();
                notifyStopped();
            } catch (Throwable th) {
                notifyFailed(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkRuntimeContextProvider$SparkServiceAnnouncer.class */
    public static final class SparkServiceAnnouncer extends AbstractIdleService implements ServiceAnnouncer {
        private final ZKClient zkClient;
        private ZKDiscoveryService discoveryService;

        @Inject
        SparkServiceAnnouncer(CConfiguration cConfiguration, ZKClient zKClient, ProgramId programId) {
            this.zkClient = ZKClients.namespace(zKClient, String.format("%s/%s", cConfiguration.get("twill.zookeeper.namespace"), ServiceDiscoverable.getName(programId)));
        }

        public Cancellable announce(String str, int i) {
            return announce(str, i, Bytes.EMPTY_BYTE_ARRAY);
        }

        public Cancellable announce(String str, int i, byte[] bArr) {
            return this.discoveryService.register(new Discoverable(str, new InetSocketAddress(SparkRuntimeContextProvider.access$200(), i), bArr));
        }

        protected void startUp() throws Exception {
            this.discoveryService = new ZKDiscoveryService(this.zkClient);
        }

        protected void shutDown() throws Exception {
            if (this.discoveryService != null) {
                this.discoveryService.close();
            }
        }
    }

    public static SparkRuntimeContext get() {
        if (sparkRuntimeContext != null) {
            return sparkRuntimeContext;
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        if (EXECUTOR_CLASSLOADER_NAME.equals(contextClassLoader.getClass().getName())) {
            try {
                Method declaredMethod = contextClassLoader.getClass().getDeclaredMethod("parentLoader", new Class[0]);
                if (!declaredMethod.isAccessible()) {
                    declaredMethod.setAccessible(true);
                }
                contextClassLoader = ((ClassLoader) declaredMethod.invoke(contextClassLoader, new Object[0])).getParent();
            } catch (ClassCastException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                LOG.warn("Unable to get the CDAP runtime classloader from {}. Spark program may not be running correctly if {} is being used.", new Object[]{EXECUTOR_CLASSLOADER_NAME, SparkInterpreter.class.getName(), e});
            }
        }
        SparkClassLoader sparkClassLoader = (SparkClassLoader) ClassLoaders.find(contextClassLoader, SparkClassLoader.class);
        return sparkClassLoader != null ? sparkClassLoader.getRuntimeContext() : createIfNotExists();
    }

    private static synchronized SparkRuntimeContext createIfNotExists() {
        if (sparkRuntimeContext != null) {
            return sparkRuntimeContext;
        }
        try {
            CConfiguration createCConf = createCConf();
            Configuration createHConf = createHConf();
            SparkRuntimeContextConfig sparkRuntimeContextConfig = new SparkRuntimeContextConfig(createHConf);
            ProgramOptions programOptions = sparkRuntimeContextConfig.getProgramOptions();
            Preconditions.checkState(!SparkRuntimeContextConfig.isLocal(programOptions), "SparkContextProvider.getSparkContext should only be called in Spark executor process.");
            Program createProgram = createProgram(createCConf, sparkRuntimeContextConfig);
            Injector createInjector = createInjector(createCConf, createHConf, sparkRuntimeContextConfig.getProgramId(), programOptions);
            MetricsCollectionService metricsCollectionService = (MetricsCollectionService) createInjector.getInstance(MetricsCollectionService.class);
            SparkServiceAnnouncer sparkServiceAnnouncer = (SparkServiceAnnouncer) createInjector.getInstance(SparkServiceAnnouncer.class);
            final LinkedList linkedList = new LinkedList();
            linkedList.add(new LogAppenderService((LogAppenderInitializer) createInjector.getInstance(LogAppenderInitializer.class), programOptions));
            linkedList.add(createInjector.getInstance(ZKClientService.class));
            linkedList.add(metricsCollectionService);
            linkedList.add(sparkServiceAnnouncer);
            if (ProgramRunners.getClusterMode(programOptions) == ClusterMode.ON_PREMISE) {
                linkedList.add(createInjector.getInstance(KafkaClientService.class));
                linkedList.add(createInjector.getInstance(StreamCoordinatorClient.class));
            }
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                ((Service) it.next()).startAndWait();
            }
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeContextProvider.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    System.out.println("Shutting down Spark runtime services");
                    Deque deque = linkedList;
                    deque.getClass();
                    Iterable<Service> iterable = deque::descendingIterator;
                    for (Service service : iterable) {
                        try {
                            service.stopAndWait();
                        } catch (Exception e) {
                            SparkRuntimeContextProvider.LOG.warn("Exception raised when stopping service {} during program termination.", service, e);
                        }
                    }
                    System.out.println("Spark runtime services shutdown completed");
                }
            });
            NameMappedDatasetFramework nameMappedDatasetFramework = (DatasetFramework) createInjector.getInstance(DatasetFramework.class);
            WorkflowProgramInfo workflowProgramInfo = sparkRuntimeContextConfig.getWorkflowProgramInfo();
            NameMappedDatasetFramework createFromWorkflowProgramInfo = workflowProgramInfo == null ? nameMappedDatasetFramework : NameMappedDatasetFramework.createFromWorkflowProgramInfo(nameMappedDatasetFramework, workflowProgramInfo, sparkRuntimeContextConfig.getApplicationSpecification());
            if (createFromWorkflowProgramInfo instanceof ProgramContextAware) {
                ((ProgramContextAware) createFromWorkflowProgramInfo).setContext(new BasicProgramContext(createProgram.getId().run(ProgramRunners.getRunId(programOptions))));
            }
            sparkRuntimeContext = new SparkRuntimeContext(sparkRuntimeContextConfig.getConfiguration(), createProgram, programOptions, createCConf, getHostname(), (TransactionSystemClient) createInjector.getInstance(TransactionSystemClient.class), createFromWorkflowProgramInfo, (DiscoveryServiceClient) createInjector.getInstance(DiscoveryServiceClient.class), metricsCollectionService, (StreamAdmin) createInjector.getInstance(StreamAdmin.class), sparkRuntimeContextConfig.getWorkflowProgramInfo(), createPluginInstantiator(createCConf, sparkRuntimeContextConfig, createProgram.getClassLoader()), (SecureStore) createInjector.getInstance(SecureStore.class), (SecureStoreManager) createInjector.getInstance(SecureStoreManager.class), (AuthorizationEnforcer) createInjector.getInstance(AuthorizationEnforcer.class), (AuthenticationContext) createInjector.getInstance(AuthenticationContext.class), (MessagingService) createInjector.getInstance(MessagingService.class), sparkServiceAnnouncer, (PluginFinder) createInjector.getInstance(PluginFinder.class), (LocationFactory) createInjector.getInstance(LocationFactory.class), (MetadataReader) createInjector.getInstance(MetadataReader.class), (MetadataPublisher) createInjector.getInstance(MetadataPublisher.class));
            LoggingContextAccessor.setLoggingContext(sparkRuntimeContext.getLoggingContext());
            return sparkRuntimeContext;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private static CConfiguration createCConf() throws MalformedURLException {
        return CConfiguration.create(new File(CCONF_FILE_NAME), new File[0]);
    }

    private static Configuration createHConf() throws MalformedURLException {
        Configuration configuration = new Configuration();
        configuration.clear();
        configuration.addResource(new File(HCONF_FILE_NAME).toURI().toURL());
        return configuration;
    }

    private static String getHostname() {
        String str = System.getenv(ApplicationConstants.Environment.NM_HOST.key());
        if (str != null) {
            return str;
        }
        try {
            return InetAddress.getLocalHost().getCanonicalHostName();
        } catch (UnknownHostException e) {
            throw Throwables.propagate(e);
        }
    }

    private static Program createProgram(CConfiguration cConfiguration, SparkRuntimeContextConfig sparkRuntimeContextConfig) throws IOException {
        File file = new File(PROGRAM_JAR_NAME);
        return new DefaultProgram(new ProgramDescriptor(sparkRuntimeContextConfig.getProgramId(), sparkRuntimeContextConfig.getApplicationSpecification()), Locations.toLocation(file), new ProgramClassLoader(cConfiguration, new File(PROGRAM_JAR_EXPANDED_NAME), new FilterClassLoader(SparkRuntimeContextProvider.class.getClassLoader(), SparkResourceFilters.SPARK_PROGRAM_CLASS_LOADER_FILTER)));
    }

    @Nullable
    private static PluginInstantiator createPluginInstantiator(CConfiguration cConfiguration, SparkRuntimeContextConfig sparkRuntimeContextConfig, ClassLoader classLoader) {
        String pluginArchive = sparkRuntimeContextConfig.getPluginArchive();
        if (pluginArchive == null) {
            return null;
        }
        return new PluginInstantiator(cConfiguration, classLoader, new File(pluginArchive));
    }

    @VisibleForTesting
    public static Injector createInjector(CConfiguration cConfiguration, Configuration configuration, ProgramId programId, ProgramOptions programOptions) {
        String option = programOptions.getArguments().getOption("runId");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DistributedProgramContainerModule(cConfiguration, configuration, programId.run(option), programOptions.getArguments()));
        arrayList.add(ProgramRunners.getClusterMode(programOptions) == ClusterMode.ON_PREMISE ? new DistributedArtifactManagerModule() : new AbstractModule() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeContextProvider.2
            protected void configure() {
                bind(PluginFinder.class).to(UnsupportedPluginFinder.class);
            }
        });
        return Guice.createInjector(arrayList);
    }

    private SparkRuntimeContextProvider() {
    }

    static /* synthetic */ String access$200() {
        return getHostname();
    }
}
