package azkaban.execapp;

import azkaban.AzkabanCommonModule;
import azkaban.ServiceProvider;
import azkaban.execapp.event.JobCallbackManager;
import azkaban.execapp.jmx.JmxFlowRampManager;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.execapp.metric.NumFailedFlowMetric;
import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.execapp.metric.NumQueuedFlowMetric;
import azkaban.execapp.metric.NumRunningFlowMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jmx.JmxJettyServer;
import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.metrics.MetricsManager;
import azkaban.server.AzkabanServer;
import azkaban.server.IMBeanRegistrable;
import azkaban.server.MBeanRegistrationManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.StdOutErrRedirect;
import azkaban.utils.Utils;
import com.google.common.base.Preconditions;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.security.Permission;
import java.security.Policy;
import java.security.ProtectionDomain;
import java.time.Duration;
import java.util.Objects;
import java.util.TimeZone;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;

@Singleton
/* loaded from: input_file:azkaban/execapp/AzkabanExecutorServer.class */
public class AzkabanExecutorServer implements IMBeanRegistrable {
    public static final String JOBTYPE_PLUGIN_DIR = "azkaban.jobtype.plugin.dir";
    public static final String RAMPPOLICY_PLUGIN_DIR = "azkaban.ramppolicy.plugin.dir";
    public static final String METRIC_INTERVAL = "executor.metric.milisecinterval.";
    private static final String CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY = "jmx.attribute.processor.class";
    private static final Logger logger = Logger.getLogger(AzkabanExecutorServer.class);
    private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
    private static AzkabanExecutorServer app;
    private final MBeanRegistrationManager mbeanRegistrationManager = new MBeanRegistrationManager();
    private final ExecutorLoader executionLoader;
    private final FlowRunnerManager runnerManager;
    private final FlowRampManager rampManager;
    private final MetricsManager metricsManager;
    private final Props props;
    private final Server server;
    private final Context root;

    @Inject
    public AzkabanExecutorServer(Props props, ExecutorLoader executorLoader, FlowRunnerManager flowRunnerManager, FlowRampManager flowRampManager, MetricsManager metricsManager, @Named("ExecServer") Server server, @Named("root") Context context) {
        this.props = props;
        this.executionLoader = executorLoader;
        this.runnerManager = flowRunnerManager;
        this.rampManager = flowRampManager;
        this.metricsManager = metricsManager;
        this.server = server;
        this.root = context;
    }

    public static AzkabanExecutorServer getApp() {
        return app;
    }

    public static void main(String[] strArr) throws Exception {
        StdOutErrRedirect.redirectOutAndErrToLog();
        logger.info("Starting Jetty Azkaban Executor...");
        if (System.getSecurityManager() == null) {
            Policy.setPolicy(new Policy() { // from class: azkaban.execapp.AzkabanExecutorServer.1
                @Override // java.security.Policy
                public boolean implies(ProtectionDomain protectionDomain, Permission permission) {
                    return true;
                }
            });
            System.setSecurityManager(new SecurityManager());
        }
        Props loadProps = AzkabanServer.loadProps(strArr);
        if (loadProps == null) {
            logger.error("Azkaban Properties not loaded.");
            logger.error("Exiting Azkaban Executor Server...");
        } else {
            Injector createInjector = Guice.createInjector(new Module[]{new AzkabanCommonModule(loadProps), new AzkabanExecServerModule()});
            ServiceProvider.SERVICE_PROVIDER.setInjector(createInjector);
            launch((AzkabanExecutorServer) createInjector.getInstance(AzkabanExecutorServer.class));
        }
    }

    public static void launch(AzkabanExecutorServer azkabanExecutorServer) throws Exception {
        azkabanExecutorServer.start();
        setupTimeZone(azkabanExecutorServer.getAzkabanProps());
        app = azkabanExecutorServer;
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: azkaban.execapp.AzkabanExecutorServer.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    logTopMemoryConsumers();
                } catch (Exception e) {
                    AzkabanExecutorServer.logger.info("Exception when logging top memory consumers", e);
                }
                String host = AzkabanExecutorServer.app.getHost();
                int port = AzkabanExecutorServer.app.getPort();
                try {
                    AzkabanExecutorServer.logger.info(String.format("Removing executor(host: %s, port: %s) entry from database...", host, Integer.valueOf(port)));
                    AzkabanExecutorServer.app.getExecutorLoader().removeExecutor(host, port);
                } catch (ExecutorManagerException e2) {
                    AzkabanExecutorServer.logger.error(String.format("Exception when removing executor(host: %s, port: %s)", host, Integer.valueOf(port)), e2);
                }
                AzkabanExecutorServer.logger.warn("Shutting down executor...");
                try {
                    AzkabanExecutorServer.app.shutdownNow();
                    AzkabanExecutorServer.app.getFlowRunnerManager().deleteExecutionDirectory();
                } catch (Exception e3) {
                    AzkabanExecutorServer.logger.error("Error while shutting down http server.", e3);
                }
            }

            public void logTopMemoryConsumers() throws Exception, IOException {
                if (!new File("/bin/bash").exists() || !new File("/bin/ps").exists() || !new File("/usr/bin/head").exists()) {
                    return;
                }
                AzkabanExecutorServer.logger.info("logging top memory consumer");
                Process start = new ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head").start();
                start.waitFor();
                InputStream inputStream = start.getInputStream();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        inputStream.close();
                        return;
                    }
                    AzkabanExecutorServer.logger.info(readLine);
                }
            }
        });
    }

    private static void setupTimeZone(Props props) {
        if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
            String string = props.getString(DEFAULT_TIMEZONE_ID);
            System.setProperty("user.timezone", string);
            TimeZone timeZone = TimeZone.getTimeZone(string);
            TimeZone.setDefault(timeZone);
            DateTimeZone.setDefault(DateTimeZone.forTimeZone(timeZone));
            logger.info("Setting timezone to " + string);
        }
    }

    private void start() throws Exception {
        this.root.setAttribute("azkaban_app", this);
        JmxJobMBeanManager.getInstance().initialize(this.props);
        configureJobCallback(this.props);
        configureMBeanServer();
        configureMetricReports();
        loadCustomJMXAttributeProcessor(this.props);
        initActive();
        try {
            this.server.start();
        } catch (Exception e) {
            logger.error(e);
            Utils.croak(e.getMessage(), 1);
        }
        insertExecutorEntryIntoDB();
        dumpPortToFile();
        logger.info("Started Executor Server on " + getExecutorHostPort());
        if (this.props.getBoolean("azkaban.is.metrics.enabled", false)) {
            startReportingExecMetrics();
        }
    }

    private void startReportingExecMetrics() {
        logger.info("starting reporting Executor Metrics");
        this.metricsManager.startReporting("AZ-EXEC", this.props);
    }

    private void initActive() throws ExecutorManagerException {
        int i = this.props.getInt("executor.port", -1);
        if (i == -1) {
            logger.info("executor.port wasn't set - free port will be picked automatically. Executor is started with active=false and must be activated separately.");
            return;
        }
        try {
            Executor fetchExecutor = this.executionLoader.fetchExecutor((String) Objects.requireNonNull(getHost()), i);
            if (fetchExecutor == null) {
                logger.info("This executor wasn't found in the DB. Setting active=false.");
                getFlowRunnerManager().setActiveInternal(false);
            } else {
                logger.info("This executor is already in the DB. Found active=" + fetchExecutor.isActive());
                getFlowRunnerManager().setActiveInternal(fetchExecutor.isActive());
            }
        } catch (ExecutorManagerException e) {
            logger.error("Error fetching executor entry from DB", e);
            throw e;
        }
    }

    private void insertExecutorEntryIntoDB() throws ExecutorManagerException {
        try {
            String str = (String) Objects.requireNonNull(getHost());
            int port = getPort();
            Preconditions.checkState(port != -1);
            Executor fetchExecutor = this.executionLoader.fetchExecutor(str, port);
            if (fetchExecutor == null) {
                logger.info("This executor wasn't found in the DB. Adding self.");
                this.executionLoader.addExecutor(str, port);
            } else {
                logger.info("This executor is already in the DB. Found: " + fetchExecutor);
            }
        } catch (ExecutorManagerException e) {
            logger.error("Error inserting executor entry into DB", e);
            throw e;
        }
    }

    private void dumpPortToFile() throws IOException {
        FileIOUtils.dumpNumberToFile(Paths.get(this.props.getString("executor.portfile", "executor.port"), new String[0]), getPort());
    }

    private void configureJobCallback(Props props) {
        boolean z = props.getBoolean("azkaban.executor.jobcallback.enabled", true);
        logger.info("Job callback enabled? " + z);
        if (z) {
            JobCallbackManager.initialize(props);
        }
    }

    private void configureMetricReports() throws MetricException {
        Props azkabanProps = getAzkabanProps();
        if (azkabanProps == null || !azkabanProps.getBoolean("executor.metric.reports", false)) {
            return;
        }
        logger.info("Starting to configure Metric Reports");
        MetricReportManager metricReportManager = MetricReportManager.getInstance();
        metricReportManager.addMetricEmitter(new InMemoryMetricEmitter(azkabanProps));
        logger.info("Adding number of failed flow metric");
        metricReportManager.addMetric(new NumFailedFlowMetric(metricReportManager, azkabanProps.getInt("executor.metric.milisecinterval.NumFailedFlowMetric", azkabanProps.getInt("executor.metric.milisecinterval.default"))));
        logger.info("Adding number of failed jobs metric");
        metricReportManager.addMetric(new NumFailedJobMetric(metricReportManager, azkabanProps.getInt("executor.metric.milisecinterval.NumFailedJobMetric", azkabanProps.getInt("executor.metric.milisecinterval.default"))));
        logger.info("Adding number of running Jobs metric");
        metricReportManager.addMetric(new NumRunningJobMetric(metricReportManager, azkabanProps.getInt("executor.metric.milisecinterval.NumRunningJobMetric", azkabanProps.getInt("executor.metric.milisecinterval.default"))));
        logger.info("Adding number of running flows metric");
        metricReportManager.addMetric(new NumRunningFlowMetric(this.runnerManager, metricReportManager, azkabanProps.getInt("executor.metric.milisecinterval.NumRunningFlowMetric", azkabanProps.getInt("executor.metric.milisecinterval.default"))));
        logger.info("Adding number of queued flows metric");
        metricReportManager.addMetric(new NumQueuedFlowMetric(this.runnerManager, metricReportManager, azkabanProps.getInt("executor.metric.milisecinterval.NumQueuedFlowMetric", azkabanProps.getInt("executor.metric.milisecinterval.default"))));
        logger.info("Completed configuring Metric Reports");
    }

    private void loadCustomJMXAttributeProcessor(Props props) {
        String str = props.get(CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY);
        if (str == null) {
            logger.info("No value for property: jmx.attribute.processor.class was found");
            return;
        }
        try {
            logger.info("jmxAttributeEmitter: " + str);
            Class.forName(str).getConstructors()[0].newInstance(props.toProperties());
        } catch (Exception e) {
            logger.error("Encountered error while loading and instantiating " + str, e);
            throw new IllegalStateException("Encountered error while loading and instantiating " + str, e);
        }
    }

    public ExecutorLoader getExecutorLoader() {
        return this.executionLoader;
    }

    public Props getAzkabanProps() {
        return this.props;
    }

    public FlowRunnerManager getFlowRunnerManager() {
        return this.runnerManager;
    }

    public FlowRampManager getFlowRampManager() {
        return this.rampManager;
    }

    public String getHost() {
        if (this.props.containsKey("azkaban.server.hostname")) {
            String string = this.props.getString("azkaban.server.hostname");
            if (!StringUtils.isEmpty(string)) {
                return string;
            }
        }
        String str = "unkownHost";
        try {
            str = InetAddress.getLocalHost().getCanonicalHostName();
        } catch (Exception e) {
            logger.error("Failed to fetch LocalHostName");
        }
        return str;
    }

    public int getPort() {
        Connector[] connectors = this.server.getConnectors();
        Preconditions.checkState(connectors.length >= 1, "Server must have at least 1 connector");
        return connectors[0].getLocalPort();
    }

    public String getExecutorHostPort() {
        return getHost() + ":" + getPort();
    }

    private void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            logger.error(e);
        }
    }

    public void shutdown() {
        logger.warn("Shutting down AzkabanExecutorServer...");
        new Thread(() -> {
            sleep(Duration.ofSeconds(2L));
            shutdownInternal();
        }, "shutdown").start();
    }

    private void shutdownInternal() {
        getFlowRampManager().shutdown();
        getFlowRunnerManager().shutdown();
        sleep(Duration.ofHours(1L));
        System.exit(0);
    }

    public void shutdownNow() throws Exception {
        this.server.stop();
        this.server.destroy();
        getFlowRampManager().shutdownNow();
        getFlowRunnerManager().shutdownNow();
        this.mbeanRegistrationManager.closeMBeans();
    }

    public void configureMBeanServer() {
        logger.info("Registering MBeans...");
        this.mbeanRegistrationManager.registerMBean("executorJetty", new JmxJettyServer(this.server));
        this.mbeanRegistrationManager.registerMBean("flowRunnerManager", new JmxFlowRunnerManager(this.runnerManager));
        this.mbeanRegistrationManager.registerMBean("flowRampManager", new JmxFlowRampManager(this.rampManager));
        this.mbeanRegistrationManager.registerMBean("jobJMXMBean", JmxJobMBeanManager.getInstance());
        if (JobCallbackManager.isInitialized()) {
            this.mbeanRegistrationManager.registerMBean("jobCallbackJMXMBean", JobCallbackManager.getInstance().getJmxJobCallbackMBean());
        }
    }

    public MBeanRegistrationManager getMBeanRegistrationManager() {
        return this.mbeanRegistrationManager;
    }
}
