package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.data.batch.BatchReadable;
import co.cask.cdap.api.data.batch.BatchWritable;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.data.batch.Split;
import co.cask.cdap.api.data.stream.StreamBatchReadable;
import co.cask.cdap.api.dataset.DataSetException;
import co.cask.cdap.api.mapreduce.MapReduce;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.lang.CombineClassLoader;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.data.stream.StreamInputFormat;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.internal.app.runtime.batch.dataset.DataSetInputFormat;
import co.cask.cdap.internal.app.runtime.batch.dataset.DataSetOutputFormat;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.DefaultTransactionExecutor;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.inject.ProvisionException;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.ApplicationBundler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MapReduceRuntimeService.class */
final class MapReduceRuntimeService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(MapReduceRuntimeService.class);
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final MapReduce mapReduce;
    private final MapReduceSpecification specification;
    private final Location programJarLocation;
    private final BasicMapReduceContext context;
    private final LocationFactory locationFactory;
    private final StreamAdmin streamAdmin;
    private final TransactionSystemClient txClient;
    private Job job;
    private Transaction transaction;
    private Runnable cleanupTask;
    private volatile boolean stopRequested;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MapReduceRuntimeService(CConfiguration cConfiguration, Configuration configuration, MapReduce mapReduce, MapReduceSpecification mapReduceSpecification, BasicMapReduceContext basicMapReduceContext, Location location, LocationFactory locationFactory, StreamAdmin streamAdmin, TransactionSystemClient transactionSystemClient) {
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.mapReduce = mapReduce;
        this.specification = mapReduceSpecification;
        this.programJarLocation = location;
        this.context = basicMapReduceContext;
        this.locationFactory = locationFactory;
        this.streamAdmin = streamAdmin;
        this.txClient = transactionSystemClient;
    }

    protected String getServiceName() {
        return "MapReduceRunner-" + this.specification.getName();
    }

    protected void startUp() throws Exception {
        Job job = Job.getInstance(new Configuration(this.hConf));
        Configuration configuration = job.getConfiguration();
        Resources mapperResources = this.specification.getMapperResources();
        Resources reducerResources = this.specification.getReducerResources();
        if (mapperResources != null) {
            configuration.setInt("mapreduce.map.memory.mb", mapperResources.getMemoryMB());
            configuration.set("mapreduce.map.java.opts", "-Xmx" + ((int) (mapperResources.getMemoryMB() * 0.8d)) + "m");
            setVirtualCores(configuration, mapperResources.getVirtualCores(), "MAP");
        }
        if (reducerResources != null) {
            configuration.setInt("mapreduce.reduce.memory.mb", reducerResources.getMemoryMB());
            configuration.set("mapreduce.reduce.java.opts", "-Xmx" + ((int) (reducerResources.getMemoryMB() * 0.8d)) + "m");
            setVirtualCores(configuration, reducerResources.getVirtualCores(), "REDUCE");
        }
        configuration.setBoolean("mapreduce.user.classpath.first", true);
        configuration.setBoolean("mapreduce.job.user.classpath.first", true);
        if (UserGroupInformation.isSecurityEnabled()) {
            configuration.unset("mapreduce.jobhistory.address");
            configuration.setBoolean("mapreduce.job.am-access-disabled", true);
            Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
            LOG.info("Running in secure mode; adding all user credentials: {}", credentials.getAllTokens());
            job.getCredentials().addAll(credentials);
        }
        job.getConfiguration().setClassLoader(new CombineClassLoader((ClassLoader) Objects.firstNonNull(Thread.currentThread().getContextClassLoader(), ClassLoader.getSystemClassLoader()), ImmutableList.of(this.context.getProgram().getClassLoader())));
        this.context.setJob(job);
        beforeSubmit();
        setInputDatasetIfNeeded(job);
        setOutputDatasetIfNeeded(job);
        MapperWrapper.wrap(job);
        ReducerWrapper.wrap(job);
        Location buildJobJar = buildJobJar(this.context);
        try {
            try {
                Location copyProgramJar = copyProgramJar();
                try {
                    job.setJar(buildJobJar.toURI().toString());
                    job.addFileToClassPath(new Path(copyProgramJar.toURI()));
                    MapReduceContextConfig mapReduceContextConfig = new MapReduceContextConfig(job);
                    Transaction startLong = this.txClient.startLong();
                    try {
                        mapReduceContextConfig.set(this.context, this.cConf, startLong, copyProgramJar.getName());
                        LOG.info("Submitting MapReduce Job: {}", this.context);
                        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                        Thread.currentThread().setContextClassLoader(job.getConfiguration().getClassLoader());
                        try {
                            job.submit();
                            Thread.currentThread().setContextClassLoader(contextClassLoader);
                            this.job = job;
                            this.transaction = startLong;
                            this.cleanupTask = createCleanupTask(buildJobJar, copyProgramJar);
                        } catch (Throwable th) {
                            Thread.currentThread().setContextClassLoader(contextClassLoader);
                            throw th;
                        }
                    } catch (Throwable th2) {
                        Transactions.invalidateQuietly(this.txClient, startLong);
                        throw Throwables.propagate(th2);
                    }
                } catch (Throwable th3) {
                    Locations.deleteQuietly(copyProgramJar);
                    throw Throwables.propagate(th3);
                }
            } catch (Throwable th4) {
                Locations.deleteQuietly(buildJobJar);
                throw Throwables.propagate(th4);
            }
        } catch (Throwable th5) {
            LOG.error("Exception when submitting MapReduce Job: {}", this.context, th5);
            throw Throwables.propagate(th5);
        }
    }

    protected void run() throws Exception {
        MapReduceMetricsWriter mapReduceMetricsWriter = new MapReduceMetricsWriter(this.job, this.context);
        while (!this.job.isComplete()) {
            mapReduceMetricsWriter.reportStats();
            TimeUnit.SECONDS.sleep(1L);
        }
        LOG.info("MapReduce Job is complete, status: {}, job: {}", Boolean.valueOf(this.job.isSuccessful()), this.context);
        mapReduceMetricsWriter.reportStats();
        TimeUnit.SECONDS.sleep(2L);
        if (this.stopRequested) {
            return;
        }
        Preconditions.checkState(this.job.isSuccessful(), "MapReduce execution failure: %s", new Object[]{this.job.getStatus()});
    }

    protected void shutDown() throws Exception {
        boolean isSuccessful = this.job.isSuccessful();
        try {
            if (isSuccessful) {
                LOG.info("Committing MapReduce Job transaction: {}", this.context);
                if (!this.txClient.commit(this.transaction)) {
                    LOG.warn("MapReduce Job transaction failed to commit");
                    throw new TransactionFailureException("Failed to commit transaction for MapReduce " + this.context.toString());
                }
            } else {
                this.txClient.invalidate(this.transaction.getWritePointer());
            }
            try {
                onFinish(isSuccessful);
                this.context.close();
                this.cleanupTask.run();
            } finally {
            }
        } catch (Throwable th) {
            try {
                onFinish(isSuccessful);
                this.context.close();
                this.cleanupTask.run();
                throw th;
            } finally {
            }
        }
    }

    protected void triggerShutdown() {
        try {
            this.stopRequested = true;
            if (this.job != null && !this.job.isComplete()) {
                this.job.killJob();
            }
        } catch (IOException e) {
            LOG.error("Failed to kill MapReduce job {}", this.context, e);
            throw Throwables.propagate(e);
        }
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.1
            @Override // java.util.concurrent.Executor
            public void execute(final Runnable runnable) {
                Thread thread = new Thread(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        LoggingContextAccessor.setLoggingContext(MapReduceRuntimeService.this.context.getLoggingContext());
                        runnable.run();
                    }
                });
                thread.setDaemon(true);
                thread.setName(MapReduceRuntimeService.this.getServiceName());
                thread.start();
            }
        };
    }

    private void setVirtualCores(Configuration configuration, int i, String str) {
        try {
            configuration.setInt(Job.class.getField(str + "_CPU_VCORES").get(null).toString(), i);
        } catch (Exception e) {
        }
    }

    private void beforeSubmit() throws TransactionFailureException, InterruptedException {
        createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.2
            public void apply() throws Exception {
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                Thread.currentThread().setContextClassLoader(MapReduceRuntimeService.this.mapReduce.getClass().getClassLoader());
                try {
                    MapReduceRuntimeService.this.mapReduce.beforeSubmit(MapReduceRuntimeService.this.context);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    throw th;
                }
            }
        });
    }

    private void onFinish(final boolean z) throws TransactionFailureException, InterruptedException {
        createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.3
            public void apply() throws Exception {
                MapReduceRuntimeService.this.mapReduce.onFinish(z, MapReduceRuntimeService.this.context);
            }
        });
    }

    private TransactionExecutor createTransactionExecutor() {
        return new DefaultTransactionExecutor(this.txClient, this.context.getDatasetInstantiator().getTransactionAware());
    }

    private void setInputDatasetIfNeeded(Job job) throws IOException {
        String inputDatasetName = this.context.getInputDatasetName();
        if (inputDatasetName == null) {
            inputDatasetName = this.context.getSpecification().getInputDataSet();
        }
        if (inputDatasetName == null) {
            return;
        }
        if (inputDatasetName.startsWith("stream://")) {
            configureStreamInput(job, new StreamBatchReadable(URI.create(inputDatasetName)));
            return;
        }
        LOG.debug("Using Dataset {} as input for MapReduce Job", inputDatasetName);
        BatchReadable dataset = this.context.getDataset(inputDatasetName);
        if (dataset instanceof BatchReadable) {
            BatchReadable batchReadable = dataset;
            List<Split> inputDataSelection = this.context.getInputDataSelection();
            if (inputDataSelection == null) {
                inputDataSelection = batchReadable.getSplits();
            }
            this.context.setInput(inputDatasetName, inputDataSelection);
            DataSetInputFormat.setInput(job, inputDatasetName);
            return;
        }
        InputFormatProvider inputFormatProvider = (InputFormatProvider) dataset;
        Class inputFormatClass = inputFormatProvider.getInputFormatClass();
        if (inputFormatClass == null) {
            throw new DataSetException("Input dataset '" + inputDatasetName + "' provided null as the input format");
        }
        job.setInputFormatClass(inputFormatClass);
        Map inputFormatConfiguration = inputFormatProvider.getInputFormatConfiguration();
        if (inputFormatConfiguration != null) {
            for (Map.Entry entry : inputFormatConfiguration.entrySet()) {
                job.getConfiguration().set((String) entry.getKey(), (String) entry.getValue());
            }
        }
    }

    private void setOutputDatasetIfNeeded(Job job) {
        String outputDatasetName = this.context.getOutputDatasetName();
        if (outputDatasetName == null) {
            outputDatasetName = this.context.getSpecification().getOutputDataSet();
        }
        if (outputDatasetName == null) {
            return;
        }
        LOG.debug("Using Dataset {} as output for MapReduce Job", outputDatasetName);
        OutputFormatProvider dataset = this.context.getDataset(outputDatasetName);
        if (dataset instanceof BatchWritable) {
            DataSetOutputFormat.setOutput(job, outputDatasetName);
            return;
        }
        OutputFormatProvider outputFormatProvider = dataset;
        Class outputFormatClass = outputFormatProvider.getOutputFormatClass();
        if (outputFormatClass == null) {
            throw new DataSetException("Output dataset '" + outputDatasetName + "' provided null as the output format");
        }
        job.setOutputFormatClass(outputFormatClass);
        Map outputFormatConfiguration = outputFormatProvider.getOutputFormatConfiguration();
        if (outputFormatConfiguration != null) {
            for (Map.Entry entry : outputFormatConfiguration.entrySet()) {
                job.getConfiguration().set((String) entry.getKey(), (String) entry.getValue());
            }
        }
    }

    private void configureStreamInput(Job job, StreamBatchReadable streamBatchReadable) throws IOException {
        StreamConfig config = this.streamAdmin.getConfig(streamBatchReadable.getStreamName());
        Location createGenerationLocation = StreamUtils.createGenerationLocation(config.getLocation(), StreamUtils.getGeneration(config));
        StreamInputFormat.setTTL(job, config.getTTL());
        StreamInputFormat.setStreamPath(job, createGenerationLocation.toURI());
        StreamInputFormat.setTimeRange(job, streamBatchReadable.getStartTime(), streamBatchReadable.getEndTime());
        String decoderType = streamBatchReadable.getDecoderType();
        if (decoderType == null) {
            setStreamEventDecoder(job);
        } else {
            StreamInputFormat.setDecoderClassName(job, decoderType);
        }
        job.setInputFormatClass(StreamInputFormat.class);
        LOG.info("Using Stream as input from {}", createGenerationLocation.toURI());
    }

    private void setStreamEventDecoder(Job job) throws IOException {
        if (!setStreamEventDecoder(job, "mapreduce.job.map.class", Mapper.class) && !setStreamEventDecoder(job, "mapreduce.job.reduce.class", Reducer.class)) {
            throw new IOException("Failed to consume StreamEvent without Mapper/Reducer");
        }
    }

    private boolean setStreamEventDecoder(Job job, String str, Class<?> cls) throws IOException {
        Class cls2 = job.getConfiguration().getClass(str, (Class) null, cls);
        if (cls2 == null) {
            return false;
        }
        TypeToken.TypeSet types = TypeToken.of(cls2).getTypes();
        for (TypeToken typeToken : Iterables.concat(types.classes(), types.interfaces())) {
            if (cls.equals(typeToken.getRawType()) && ParameterizedType.class.isAssignableFrom(typeToken.getType().getClass())) {
                Type[] actualTypeArguments = ((ParameterizedType) typeToken.getType()).getActualTypeArguments();
                if (actualTypeArguments.length >= 2 && LongWritable.class.equals(actualTypeArguments[0])) {
                    try {
                        StreamInputFormat.inferDecoderClass(job.getConfiguration(), actualTypeArguments[1]);
                        return true;
                    } catch (IllegalArgumentException e) {
                        LOG.debug("Failed to set decoder", e);
                    }
                }
            }
        }
        throw new IOException("Failed to determine decoder for consuming StreamEvent from " + cls2);
    }

    private Location buildJobJar(BasicMapReduceContext basicMapReduceContext) throws IOException {
        Class decoderClass;
        ApplicationBundler applicationBundler = new ApplicationBundler(ImmutableList.of("org.apache.hadoop", "org.apache.spark"), ImmutableList.of("org.apache.hadoop.hbase", "org.apache.hadoop.hive"));
        Id.Program id = basicMapReduceContext.getProgram().getId();
        Location create = this.locationFactory.create(String.format("%s.%s.%s.%s.%s.jar", ProgramType.MAPREDUCE.name().toLowerCase(), id.getAccountId(), id.getApplicationId(), id.getId(), basicMapReduceContext.getRunId().getId()));
        LOG.debug("Creating Job jar: {}", create.toURI());
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(MapReduce.class);
        newHashSet.add(MapperWrapper.class);
        newHashSet.add(ReducerWrapper.class);
        Job job = (Job) basicMapReduceContext.getHadoopJob();
        try {
            Class inputFormatClass = job.getInputFormatClass();
            LOG.info("InputFormat class: {} {}", inputFormatClass, inputFormatClass.getClassLoader());
            newHashSet.add(inputFormatClass);
            if (StreamInputFormat.class.isAssignableFrom(inputFormatClass) && (decoderClass = StreamInputFormat.getDecoderClass(job.getConfiguration())) != null) {
                newHashSet.add(decoderClass);
            }
        } catch (Throwable th) {
            LOG.info("InputFormat class not found: {}", th.getMessage(), th);
        }
        try {
            Class outputFormatClass = job.getOutputFormatClass();
            LOG.info("OutputFormat class: {} {}", outputFormatClass, outputFormatClass.getClassLoader());
            newHashSet.add(outputFormatClass);
        } catch (Throwable th2) {
            LOG.info("OutputFormat class not found: {}", th2.getMessage(), th2);
        }
        try {
            newHashSet.add(((HBaseTableUtil) new HBaseTableUtilFactory().get()).getClass());
        } catch (ProvisionException e) {
            LOG.warn("Not including HBaseTableUtil classes in submitted Job Jar since they are not available");
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(job.getConfiguration().getClassLoader());
        applicationBundler.createBundle(create, newHashSet);
        Thread.currentThread().setContextClassLoader(contextClassLoader);
        LOG.info("Built MapReduce Job Jar at {}", create.toURI());
        return create;
    }

    private Location copyProgramJar() throws IOException {
        Id.Program id = this.context.getProgram().getId();
        Location create = this.locationFactory.create(String.format("%s.%s.%s.%s.%s.program.jar", ProgramType.MAPREDUCE.name().toLowerCase(), id.getAccountId(), id.getApplicationId(), id.getId(), this.context.getRunId().getId()));
        ByteStreams.copy(Locations.newInputSupplier(this.programJarLocation), Locations.newOutputSupplier(create));
        LOG.info("Copied Program Jar to {}, source: {}", create.toURI(), this.programJarLocation.toURI());
        return create;
    }

    private Runnable createCleanupTask(final Location... locationArr) {
        return new Runnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.4
            @Override // java.lang.Runnable
            public void run() {
                for (Location location : locationArr) {
                    Locations.deleteQuietly(location);
                }
            }
        };
    }
}
