package co.cask.cdap.app.runtime.spark.distributed;

import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.runtime.spark.SparkPackageUtils;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.HttpExceptionHandler;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.HttpResponder;
import co.cask.http.NettyHttpService;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.Command;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/distributed/SparkExecutionService.class */
public final class SparkExecutionService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(SparkExecutionService.class);
    private static final Gson GSON = new Gson();
    private static final Type TOKEN_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.app.runtime.spark.distributed.SparkExecutionService.1
    }.getType();
    private static final long SHUTDOWN_WAIT_SECONDS = 20;
    private final LocationFactory locationFactory;
    private final NettyHttpService httpServer;
    private final ProgramRunId programRunId;

    @Nullable
    private final WorkflowToken workflowToken;
    private final AtomicBoolean stopping = new AtomicBoolean();
    private final CountDownLatch stopLatch = new CountDownLatch(1);

    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/distributed/SparkExecutionService$SparkControllerHandler.class */
    public final class SparkControllerHandler extends AbstractHttpHandler {
        public SparkControllerHandler() {
        }

        @POST
        @Path("/v1/spark/{programName}/runs/{runId}/heartbeat")
        public synchronized void heartbeat(FullHttpRequest fullHttpRequest, HttpResponder httpResponder, @PathParam("programName") String str, @PathParam("runId") String str2) throws Exception {
            if (SparkExecutionService.this.stopLatch.await(0L, TimeUnit.SECONDS)) {
                throw new BadRequestException(String.format("Spark program '%s' is already stopped. Heartbeat is not accepted.", SparkExecutionService.this.programRunId));
            }
            validateRequest(str, str2);
            updateWorkflowToken(fullHttpRequest.content());
            if (!SparkExecutionService.this.stopping.get()) {
                httpResponder.sendStatus(HttpResponseStatus.OK);
            } else {
                Command.Builder.of("stop");
                httpResponder.sendJson(HttpResponseStatus.OK, SparkExecutionService.GSON.toJson(SparkCommand.STOP));
            }
        }

        @Path("/v1/spark/{programName}/runs/{runId}/completed")
        @PUT
        public synchronized void completed(FullHttpRequest fullHttpRequest, HttpResponder httpResponder, @PathParam("programName") String str, @PathParam("runId") String str2) throws Exception {
            validateRequest(str, str2);
            try {
                updateWorkflowToken(fullHttpRequest.content());
                httpResponder.sendStatus(HttpResponseStatus.OK);
                SparkExecutionService.this.stopLatch.countDown();
            } catch (Throwable th) {
                SparkExecutionService.this.stopLatch.countDown();
                throw th;
            }
        }

        @POST
        @Path("/v1/spark/{programName}/runs/{runId}/credentials")
        public void writeCredentials(FullHttpRequest fullHttpRequest, HttpResponder httpResponder, @PathParam("programName") String str, @PathParam("runId") String str2) throws Exception {
            CredentialsRequest credentialsRequest = (CredentialsRequest) SparkExecutionService.GSON.fromJson(fullHttpRequest.content().toString(StandardCharsets.UTF_8), CredentialsRequest.class);
            if (credentialsRequest == null || credentialsRequest.getUri() == null) {
                throw new BadRequestException("Expected request body a JSON object with an 'uri' field");
            }
            URI uri = credentialsRequest.getUri();
            URI uri2 = SparkExecutionService.this.locationFactory.getHomeLocation().toURI();
            if (!Objects.equals(uri2.getScheme(), uri.getScheme())) {
                throw new BadRequestException("Target URI expected to have '" + uri2.getScheme() + "' as the scheme");
            }
            if (!Objects.equals(uri.getAuthority(), uri2.getAuthority())) {
                throw new BadRequestException("Target URI expected to have '" + uri2.getAuthority() + "' as the authority");
            }
            Location create = SparkExecutionService.this.locationFactory.create(uri);
            Credentials removeSecretKeys = removeSecretKeys(new Credentials(UserGroupInformation.getCurrentUser().getCredentials()));
            DataOutputStream dataOutputStream = new DataOutputStream(create.getOutputStream("600"));
            Throwable th = null;
            try {
                removeSecretKeys.writeTokenStorageToStream(dataOutputStream);
                if (dataOutputStream != null) {
                    if (0 != 0) {
                        try {
                            dataOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        dataOutputStream.close();
                    }
                }
                SparkExecutionService.LOG.debug("Credentials written for {} of run {} to {}: {}", new Object[]{str, str2, uri, removeSecretKeys.getAllTokens()});
                httpResponder.sendStatus(HttpResponseStatus.OK);
            } catch (Throwable th3) {
                if (dataOutputStream != null) {
                    if (0 != 0) {
                        try {
                            dataOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        dataOutputStream.close();
                    }
                }
                throw th3;
            }
        }

        private void validateRequest(String str, String str2) throws Exception {
            if (!SparkExecutionService.this.programRunId.getProgram().equals(str)) {
                throw new BadRequestException(String.format("Request program name '%s' is not the same as the context program name '%s", str, SparkExecutionService.this.programRunId.getProgram()));
            }
            if (str2 == null || !SparkExecutionService.this.programRunId.getRun().equals(str2)) {
                throw new BadRequestException(String.format("Request runId '%s' is not the same as the context runId '%s'", str2, SparkExecutionService.this.programRunId.getRun()));
            }
        }

        private void updateWorkflowToken(ByteBuf byteBuf) {
            if (byteBuf.isReadable()) {
                if (SparkExecutionService.this.workflowToken == null) {
                    SparkExecutionService.LOG.warn("Spark program is not running inside Workflow. Ignore workflow token update: {}", SparkExecutionService.this.programRunId);
                    return;
                }
                try {
                    InputStreamReader inputStreamReader = new InputStreamReader((InputStream) new ByteBufInputStream(byteBuf), StandardCharsets.UTF_8);
                    Throwable th = null;
                    try {
                        try {
                            Map map = (Map) SparkExecutionService.GSON.fromJson(inputStreamReader, SparkExecutionService.TOKEN_TYPE);
                            WorkflowToken workflowToken = SparkExecutionService.this.workflowToken;
                            workflowToken.getClass();
                            map.forEach(workflowToken::put);
                            if (inputStreamReader != null) {
                                if (0 != 0) {
                                    try {
                                        inputStreamReader.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    inputStreamReader.close();
                                }
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    SparkExecutionService.LOG.warn("Exception when deocoding workflow token update request for {}", SparkExecutionService.this.programRunId, e);
                }
            }
        }

        private Credentials removeSecretKeys(Credentials credentials) {
            try {
                List list = (List) credentials.getClass().getMethod("getAllSecretKeys", new Class[0]).invoke(credentials, new Object[0]);
                Method method = credentials.getClass().getMethod("removeSecretKey", Text.class);
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    method.invoke(credentials, (Text) it.next());
                }
            } catch (Exception e) {
                try {
                    Field declaredField = credentials.getClass().getDeclaredField("secretKeysMap");
                    declaredField.setAccessible(true);
                    ((Map) declaredField.get(credentials)).clear();
                } catch (Exception e2) {
                    if (credentials.getSecretKey(new Text("sparkCookie")) != null) {
                        Properties sparkDefaultConf = SparkPackageUtils.getSparkDefaultConf();
                        boolean parseBoolean = Boolean.parseBoolean(sparkDefaultConf.getProperty("spark.authenticate"));
                        boolean parseBoolean2 = Boolean.parseBoolean(sparkDefaultConf.getProperty("spark.shuffle.service.enabled"));
                        if (parseBoolean && parseBoolean2) {
                            SparkExecutionService.LOG.warn("Unable to remove 'sparkCookie' from UGI credentials. The Spark program might fail.");
                        }
                    }
                }
            }
            return credentials;
        }
    }

    public SparkExecutionService(LocationFactory locationFactory, String str, ProgramRunId programRunId, @Nullable WorkflowToken workflowToken) {
        this.locationFactory = locationFactory;
        this.httpServer = NettyHttpService.builder(programRunId.getProgram() + "-spark-exec-service").setHttpHandlers(Collections.singletonList(new SparkControllerHandler())).setHost(str).setExceptionHandler(new HttpExceptionHandler()).build();
        this.programRunId = programRunId;
        this.workflowToken = workflowToken;
    }

    public URI getBaseURI() {
        InetSocketAddress bindAddress = this.httpServer.getBindAddress();
        if (bindAddress == null) {
            throw new IllegalStateException("SparkExecutionService hasn't been started");
        }
        return URI.create(String.format("http://%s:%d", bindAddress.getHostName(), Integer.valueOf(bindAddress.getPort())));
    }

    protected void startUp() throws Exception {
        this.httpServer.start();
    }

    protected void shutDown() throws Exception {
        this.stopping.set(true);
        if (!this.stopLatch.await(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS)) {
            LOG.warn("Timeout in waiting for Spark program to stop: {}", this.programRunId);
        }
        this.httpServer.stop();
    }

    public void shutdownNow() {
        this.stopLatch.countDown();
        stopAndWait();
    }
}
