package io.streamnative.pulsar.handlers.kop.security;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import lombok.NonNull;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseUtils;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler;
import org.apache.kafka.common.utils.Utils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.class */
public class SaslAuthenticator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SaslAuthenticator.class);
    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
    private static volatile AuthenticationService authenticationService = null;
    private final PulsarAdmin admin;
    private final Set<String> allowedMechanisms;
    private final Set<String> proxyRoles;
    private final AuthenticateCallbackHandler oauth2CallbackHandler;
    private SaslServer saslServer;
    private Session session;
    private boolean enableKafkaSaslAuthenticateHeaders;
    private State state = State.HANDSHAKE_OR_VERSIONS_REQUEST;
    private ByteBuf authenticationFailureResponse = null;
    private ChannelHandlerContext ctx = null;

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator$IllegalStateException.class */
    public static class IllegalStateException extends AuthenticationException {
        public IllegalStateException(String str, State state, State state2) {
            super(str + " actual state: " + state + " expected state: " + state2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator$State.class */
    public enum State {
        HANDSHAKE_OR_VERSIONS_REQUEST,
        HANDSHAKE_REQUEST,
        AUTHENTICATE,
        COMPLETE
    }

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator$UnsupportedSaslMechanismException.class */
    public static class UnsupportedSaslMechanismException extends AuthenticationException {
        public UnsupportedSaslMechanismException(String str) {
            super("SASL mechanism '" + str + "' requested by client is not supported");
        }
    }

    private void buildResponseOnAuthenticateFailure(RequestHeader requestHeader, AbstractRequest abstractRequest, AbstractResponse abstractResponse, Exception exc) {
        this.authenticationFailureResponse = buildKafkaResponse(requestHeader, abstractRequest, abstractResponse, exc);
    }

    public void sendAuthenticationFailureResponse() {
        if (this.authenticationFailureResponse == null) {
            return;
        }
        sendKafkaResponse(this.authenticationFailureResponse);
        this.authenticationFailureResponse = null;
    }

    private static void setCurrentAuthenticationService(AuthenticationService authenticationService2) {
        if (authenticationService == null) {
            authenticationService = authenticationService2;
        }
    }

    public SaslAuthenticator(PulsarService pulsarService, Set<String> set, KafkaServiceConfiguration kafkaServiceConfiguration) throws PulsarServerException {
        setCurrentAuthenticationService(pulsarService.getBrokerService().getAuthenticationService());
        this.admin = pulsarService.getAdminClient();
        this.allowedMechanisms = set;
        this.proxyRoles = kafkaServiceConfiguration.getProxyRoles();
        this.oauth2CallbackHandler = set.contains(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM) ? createOauth2CallbackHandler(kafkaServiceConfiguration) : null;
        this.enableKafkaSaslAuthenticateHeaders = false;
    }

    public SaslAuthenticator(PulsarAdmin pulsarAdmin, AuthenticationService authenticationService2, Set<String> set, KafkaServiceConfiguration kafkaServiceConfiguration) throws PulsarServerException {
        setCurrentAuthenticationService(authenticationService2);
        this.proxyRoles = kafkaServiceConfiguration.getProxyRoles();
        this.admin = pulsarAdmin;
        this.allowedMechanisms = set;
        this.oauth2CallbackHandler = set.contains(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM) ? createOauth2CallbackHandler(kafkaServiceConfiguration) : null;
        this.enableKafkaSaslAuthenticateHeaders = false;
    }

    public void authenticate(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, BiConsumer<Long, Throwable> biConsumer, BiConsumer<String, Long> biConsumer2) throws AuthenticationException {
        Preconditions.checkArgument(byteBuf.readableBytes() > 0);
        log.info("Authenticate {} {} {}", channelHandlerContext, this.saslServer, this.state);
        this.ctx = channelHandlerContext;
        if (this.saslServer != null && this.saslServer.isComplete()) {
            setState(State.COMPLETE);
            return;
        }
        switch (this.state) {
            case HANDSHAKE_OR_VERSIONS_REQUEST:
            case HANDSHAKE_REQUEST:
                handleKafkaRequest(channelHandlerContext, byteBuf, biConsumer, biConsumer2);
                return;
            case AUTHENTICATE:
                handleSaslToken(channelHandlerContext, byteBuf, biConsumer, biConsumer2);
                if (this.saslServer.isComplete()) {
                    setState(State.COMPLETE);
                    return;
                }
                return;
            default:
                return;
        }
    }

    public boolean complete() {
        return this.state == State.COMPLETE;
    }

    public Session session() {
        if (this.saslServer == null || !complete()) {
            return null;
        }
        return this.session;
    }

    public void reset() {
        this.state = State.HANDSHAKE_OR_VERSIONS_REQUEST;
        if (this.saslServer != null) {
            try {
                this.saslServer.dispose();
            } catch (SaslException e) {
            }
            this.saslServer = null;
        }
    }

    private void setState(State state) {
        this.state = state;
        if (log.isDebugEnabled()) {
            log.debug("Set SaslAuthenticator's state to {}", state);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [org.apache.kafka.common.security.auth.AuthenticateCallbackHandler] */
    @NonNull
    private AuthenticateCallbackHandler createOauth2CallbackHandler(@NonNull KafkaServiceConfiguration kafkaServiceConfiguration) {
        OAuthBearerUnsecuredValidatorCallbackHandler oAuthBearerUnsecuredValidatorCallbackHandler;
        if (kafkaServiceConfiguration == null) {
            throw new NullPointerException("config is marked @NonNull but is null");
        }
        if (kafkaServiceConfiguration.getKopOauth2AuthenticateCallbackHandler() != null) {
            String kopOauth2AuthenticateCallbackHandler = kafkaServiceConfiguration.getKopOauth2AuthenticateCallbackHandler();
            try {
                oAuthBearerUnsecuredValidatorCallbackHandler = (AuthenticateCallbackHandler) Class.forName(kopOauth2AuthenticateCallbackHandler).newInstance();
            } catch (ClassCastException e) {
                throw new RuntimeException("Failed to cast " + kopOauth2AuthenticateCallbackHandler + ": " + e.getMessage());
            } catch (ClassNotFoundException e2) {
                throw new RuntimeException("Failed to load class " + kopOauth2AuthenticateCallbackHandler + ": " + e2.getMessage());
            } catch (IllegalAccessException | InstantiationException e3) {
                throw new RuntimeException("Failed to create new instance of " + kopOauth2AuthenticateCallbackHandler + ": " + e3.getMessage());
            }
        } else {
            oAuthBearerUnsecuredValidatorCallbackHandler = new OAuthBearerUnsecuredValidatorCallbackHandler();
        }
        Properties kopOauth2Properties = kafkaServiceConfiguration.getKopOauth2Properties();
        HashMap hashMap = new HashMap();
        kopOauth2Properties.forEach((obj, obj2) -> {
        });
        oAuthBearerUnsecuredValidatorCallbackHandler.configure(null, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, Collections.singletonList(new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, hashMap)));
        return oAuthBearerUnsecuredValidatorCallbackHandler;
    }

    private void createSaslServer(String str) throws AuthenticationException {
        if (str.equals("PLAIN")) {
            this.saslServer = new PlainSaslServer(authenticationService, this.admin, this.proxyRoles);
        } else {
            if (!str.equals(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)) {
                throw new AuthenticationException("KoP doesn't support '" + str + "' mechanism");
            }
            if (this.oauth2CallbackHandler == null) {
                throw new IllegalArgumentException("No OAuth2CallbackHandler found when mechanism is OAUTHBEARER");
            }
            this.saslServer = new OAuthBearerSaslServer(this.oauth2CallbackHandler);
        }
    }

    private static boolean isUnsupportedApiVersionsRequest(RequestHeader requestHeader) {
        return requestHeader.apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(requestHeader.apiVersion());
    }

    private AbstractRequest parseRequest(RequestHeader requestHeader, ByteBuffer byteBuffer) {
        if (isUnsupportedApiVersionsRequest(requestHeader)) {
            return new ApiVersionsRequest((short) 0, Short.valueOf(requestHeader.apiVersion()));
        }
        ApiKeys apiKey = requestHeader.apiKey();
        short apiVersion = requestHeader.apiVersion();
        return AbstractRequest.parseRequest(apiKey, apiVersion, apiKey.parseRequest(apiVersion, byteBuffer));
    }

    private void handleKafkaRequest(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, BiConsumer<Long, Throwable> biConsumer, BiConsumer<String, Long> biConsumer2) throws AuthenticationException {
        long nowInNano = MathUtils.nowInNano();
        ByteBuffer nioBuffer = byteBuf.nioBuffer();
        RequestHeader parse = RequestHeader.parse(nioBuffer);
        ApiKeys apiKey = parse.apiKey();
        AbstractRequest parseRequest = parseRequest(parse, nioBuffer);
        biConsumer.accept(Long.valueOf(nowInNano), null);
        if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE) {
            throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
        }
        if (log.isDebugEnabled()) {
            log.debug("Handling Kafka request header {}, body {}", parse, parseRequest);
        }
        long nowInNano2 = MathUtils.nowInNano();
        if (apiKey == ApiKeys.API_VERSIONS) {
            handleApiVersionsRequest(channelHandlerContext, parse, (ApiVersionsRequest) parseRequest, Long.valueOf(nowInNano2), biConsumer2);
            return;
        }
        try {
            createSaslServer(handleHandshakeRequest(channelHandlerContext, parse, (SaslHandshakeRequest) parseRequest, Long.valueOf(nowInNano2), biConsumer2));
            setState(State.AUTHENTICATE);
        } catch (AuthenticationException e) {
            this.authenticationFailureResponse = buildKafkaResponse(parse, parseRequest, null, e);
            throw e;
        }
    }

    private static void sendKafkaResponse(ChannelHandlerContext channelHandlerContext, RequestHeader requestHeader, AbstractRequest abstractRequest, AbstractResponse abstractResponse, Exception exc) {
        ByteBuf buildKafkaResponse = buildKafkaResponse(requestHeader, abstractRequest, abstractResponse, exc);
        channelHandlerContext.channel().eventLoop().execute(() -> {
            channelHandlerContext.channel().writeAndFlush(buildKafkaResponse);
        });
    }

    private void sendKafkaResponse(ByteBuf byteBuf) {
        this.ctx.channel().eventLoop().execute(() -> {
            this.ctx.channel().writeAndFlush(byteBuf);
        });
    }

    private static ByteBuf buildKafkaResponse(RequestHeader requestHeader, AbstractRequest abstractRequest, AbstractResponse abstractResponse, Exception exc) {
        short apiVersion = requestHeader.apiVersion();
        ApiKeys apiKey = requestHeader.apiKey();
        AbstractResponse errorResponse = (exc == null || abstractResponse != null) ? abstractResponse : abstractRequest.getErrorResponse(exc);
        if (apiKey == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(apiVersion)) {
            apiVersion = ApiKeys.API_VERSIONS.oldestVersion();
        }
        return ResponseUtils.serializeResponse(apiVersion, requestHeader.toResponseHeader(), errorResponse);
    }

    @VisibleForTesting
    public static ByteBuf sizePrefixed(ByteBuffer byteBuffer) {
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(0, byteBuffer.remaining());
        ByteBuf buffer = Unpooled.buffer(allocate.capacity() + byteBuffer.remaining());
        buffer.markWriterIndex();
        buffer.writeBytes(allocate);
        buffer.writeBytes(byteBuffer);
        buffer.resetWriterIndex();
        return buffer;
    }

    private void handleSaslToken(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, BiConsumer<Long, Throwable> biConsumer, BiConsumer<String, Long> biConsumer2) throws AuthenticationException {
        long nowInNano = MathUtils.nowInNano();
        ByteBuffer nioBuffer = byteBuf.nioBuffer();
        if (!this.enableKafkaSaslAuthenticateHeaders) {
            try {
                byte[] bArr = new byte[nioBuffer.remaining()];
                nioBuffer.get(bArr, 0, bArr.length);
                byte[] evaluateResponse = this.saslServer.evaluateResponse(bArr);
                if (evaluateResponse != null) {
                    channelHandlerContext.channel().writeAndFlush(sizePrefixed(ByteBuffer.wrap(evaluateResponse))).addListener(future -> {
                        if (!future.isSuccess()) {
                            log.error("[{}] Failed to write {}", channelHandlerContext.channel(), future.cause());
                            return;
                        }
                        this.session = new Session(new KafkaPrincipal("User", this.saslServer.getAuthorizationID()), "old-clientId");
                        if (log.isDebugEnabled()) {
                            log.debug("Send sasl response to SASL_HANDSHAKE v0 old client {} successfully, session {}", channelHandlerContext.channel(), this.session);
                        }
                    });
                }
                return;
            } catch (SaslException e) {
                if (log.isDebugEnabled()) {
                    log.debug("Authenticate failed for SASL_HANDSHAKE v0 old client, reason {}", e.getMessage());
                    return;
                }
                return;
            }
        }
        RequestHeader parse = RequestHeader.parse(nioBuffer);
        ApiKeys apiKey = parse.apiKey();
        short apiVersion = parse.apiVersion();
        AbstractRequest parseRequest = AbstractRequest.parseRequest(apiKey, apiVersion, apiKey.parseRequest(apiVersion, nioBuffer));
        biConsumer.accept(Long.valueOf(nowInNano), null);
        long nowInNano2 = MathUtils.nowInNano();
        if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
            AuthenticationException authenticationException = new AuthenticationException("Unexpected Kafka request of type " + apiKey + " during SASL authentication");
            biConsumer2.accept(apiKey.name, Long.valueOf(nowInNano2));
            buildResponseOnAuthenticateFailure(parse, parseRequest, null, authenticationException);
            throw authenticationException;
        }
        if (!apiKey.isVersionSupported(apiVersion)) {
            throw new AuthenticationException("Version " + ((int) apiVersion) + " is not supported for apiKey " + apiKey);
        }
        SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest) parseRequest;
        try {
            byte[] evaluateResponse2 = this.saslServer.evaluateResponse(Utils.toArray(saslAuthenticateRequest.saslAuthBytes()));
            ByteBuffer wrap = evaluateResponse2 == null ? EMPTY_BUFFER : ByteBuffer.wrap(evaluateResponse2);
            this.session = new Session(new KafkaPrincipal("User", this.saslServer.getAuthorizationID()), parse.clientId());
            biConsumer2.accept(apiKey.name, Long.valueOf(nowInNano2));
            sendKafkaResponse(channelHandlerContext, parse, parseRequest, new SaslAuthenticateResponse(Errors.NONE, null, wrap), null);
            if (log.isDebugEnabled()) {
                log.debug("Authenticate successfully for client, header {}, request {}, session {}", parse, saslAuthenticateRequest, this.session);
            }
        } catch (SaslException e2) {
            biConsumer2.accept(apiKey.name, Long.valueOf(nowInNano2));
            buildResponseOnAuthenticateFailure(parse, parseRequest, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e2.getMessage()), null);
            sendAuthenticationFailureResponse();
            if (log.isDebugEnabled()) {
                log.debug("Authenticate failed for client, header {}, request {}, reason {}", parse, saslAuthenticateRequest, e2.getMessage());
            }
            log.error("Authenticate failed for client, header {}, request {}, reason {}", parse, saslAuthenticateRequest, e2.getMessage());
        }
    }

    private void handleApiVersionsRequest(ChannelHandlerContext channelHandlerContext, RequestHeader requestHeader, ApiVersionsRequest apiVersionsRequest, Long l, BiConsumer<String, Long> biConsumer) throws AuthenticationException {
        if (this.state != State.HANDSHAKE_OR_VERSIONS_REQUEST) {
            throw new IllegalStateException("Receive ApiVersions request", this.state, State.HANDSHAKE_OR_VERSIONS_REQUEST);
        }
        if (apiVersionsRequest.hasUnsupportedRequestVersion()) {
            biConsumer.accept(requestHeader.apiKey().name, l);
            sendKafkaResponse(channelHandlerContext, requestHeader, apiVersionsRequest, apiVersionsRequest.getErrorResponse(0, (Throwable) Errors.UNSUPPORTED_VERSION.exception()), null);
        } else {
            ApiVersionsResponse defaultApiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse();
            biConsumer.accept(requestHeader.apiKey().name, l);
            sendKafkaResponse(channelHandlerContext, requestHeader, apiVersionsRequest, defaultApiVersionsResponse, null);
            setState(State.HANDSHAKE_REQUEST);
        }
    }

    @NonNull
    private String handleHandshakeRequest(ChannelHandlerContext channelHandlerContext, RequestHeader requestHeader, SaslHandshakeRequest saslHandshakeRequest, Long l, BiConsumer<String, Long> biConsumer) throws AuthenticationException {
        String mechanism = saslHandshakeRequest.mechanism();
        if (mechanism == null) {
            AuthenticationException authenticationException = new AuthenticationException("client's mechanism is null");
            biConsumer.accept(requestHeader.apiKey().name, l);
            sendKafkaResponse(channelHandlerContext, requestHeader, saslHandshakeRequest, null, authenticationException);
            throw authenticationException;
        }
        if (requestHeader.apiVersion() >= 1) {
            this.enableKafkaSaslAuthenticateHeaders = true;
        }
        if (this.allowedMechanisms.contains(mechanism)) {
            if (log.isDebugEnabled()) {
                log.debug("Using SASL mechanism '{}' provided by client", mechanism);
            }
            biConsumer.accept(requestHeader.apiKey().name, l);
            sendKafkaResponse(channelHandlerContext, requestHeader, saslHandshakeRequest, new SaslHandshakeResponse(Errors.NONE, this.allowedMechanisms), null);
            return mechanism;
        }
        if (log.isDebugEnabled()) {
            log.debug("SASL mechanism '{}' requested by client is not supported", mechanism);
        }
        biConsumer.accept(requestHeader.apiKey().name, l);
        buildResponseOnAuthenticateFailure(requestHeader, saslHandshakeRequest, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, this.allowedMechanisms), null);
        throw new UnsupportedSaslMechanismException(mechanism);
    }

    public static AuthenticationService getAuthenticationService() {
        return authenticationService;
    }
}
