package io.streamnative.pulsar.handlers.kop.security;

import io.streamnative.pulsar.handlers.kop.SaslAuth;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.utils.SaslUtils;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.utils.Utils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.AuthData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.class */
public class SaslAuthenticator {
    private static final Logger log = LoggerFactory.getLogger(SaslAuthenticator.class);
    private final AuthenticationService authenticationService;
    private final PulsarAdmin admin;
    private final Set<String> allowedMechanisms;
    private State state = State.HANDSHAKE_OR_VERSIONS_REQUEST;
    private AuthenticationState authState = null;
    private AuthenticationDataSource authDataSource = null;
    private String authRole = null;

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator$IllegalStateException.class */
    public static class IllegalStateException extends AuthenticationException {
        public IllegalStateException(String str, State state, State state2) {
            super(str + " actual state: " + state + " expected state: " + state2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator$State.class */
    public enum State {
        HANDSHAKE_OR_VERSIONS_REQUEST,
        HANDSHAKE_REQUEST,
        AUTHENTICATE,
        COMPLETE
    }

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator$UnsupportedSaslMechanismException.class */
    public static class UnsupportedSaslMechanismException extends AuthenticationException {
        public UnsupportedSaslMechanismException(String str) {
            super("SASL mechanism '" + str + "' requested by client is not supported");
        }
    }

    public SaslAuthenticator(PulsarService pulsarService, Set<String> set) throws PulsarServerException {
        this.authenticationService = pulsarService.getBrokerService().getAuthenticationService();
        this.admin = pulsarService.getAdminClient();
        this.allowedMechanisms = set;
    }

    public void authenticate(RequestHeader requestHeader, AbstractRequest abstractRequest, CompletableFuture<AbstractResponse> completableFuture) throws AuthenticationException {
        switch (this.state) {
            case HANDSHAKE_OR_VERSIONS_REQUEST:
            case HANDSHAKE_REQUEST:
                handleKafkaRequest(requestHeader, abstractRequest, completableFuture);
                return;
            case AUTHENTICATE:
                handleAuthenticate(requestHeader, abstractRequest, completableFuture);
                return;
            default:
                return;
        }
    }

    public boolean complete() {
        return this.state == State.COMPLETE;
    }

    public void reset() {
        this.state = State.HANDSHAKE_OR_VERSIONS_REQUEST;
        this.authState = null;
        this.authDataSource = null;
        this.authRole = null;
    }

    private void setState(State state) {
        this.state = state;
        if (log.isDebugEnabled()) {
            log.debug("Set SaslAuthenticator's state to {}", state);
        }
    }

    private void handleKafkaRequest(RequestHeader requestHeader, AbstractRequest abstractRequest, CompletableFuture<AbstractResponse> completableFuture) throws AuthenticationException {
        ApiKeys apiKey = requestHeader.apiKey();
        if (apiKey == ApiKeys.API_VERSIONS) {
            handleApiVersionsRequest((ApiVersionsRequest) abstractRequest, completableFuture);
            return;
        }
        if (apiKey != ApiKeys.SASL_HANDSHAKE) {
            throw new AuthenticationException("Unexpected Kafka request of type " + apiKey + " during SASL handshake");
        }
        if (requestHeader.apiVersion() >= 1) {
            handleHandshakeRequest((SaslHandshakeRequest) abstractRequest, completableFuture);
        } else {
            AuthenticationException authenticationException = new AuthenticationException("KoP doesn't support SaslHandshake v0");
            completableFuture.complete(abstractRequest.getErrorResponse(authenticationException));
            throw authenticationException;
        }
    }

    private void handleAuthenticate(RequestHeader requestHeader, AbstractRequest abstractRequest, CompletableFuture<AbstractResponse> completableFuture) throws AuthenticationException {
        ApiKeys apiKey = requestHeader.apiKey();
        short apiVersion = requestHeader.apiVersion();
        if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
            AuthenticationException authenticationException = new AuthenticationException("Unexpected Kafka request of type " + apiKey + " during SASL authentication");
            completableFuture.complete(abstractRequest.getErrorResponse(authenticationException));
            throw authenticationException;
        }
        if (!apiKey.isVersionSupported(apiVersion)) {
            throw new AuthenticationException("Version " + ((int) apiVersion) + " is not supported for apiKey " + apiKey);
        }
        try {
            SaslAuth parseSaslAuthBytes = SaslUtils.parseSaslAuthBytes(Utils.toArray(((SaslAuthenticateRequest) abstractRequest).saslAuthBytes()));
            AuthenticationProvider authenticationProvider = this.authenticationService.getAuthenticationProvider(parseSaslAuthBytes.getAuthMethod());
            if (authenticationProvider == null) {
                AuthenticationException authenticationException2 = new AuthenticationException("No AuthenticationProvider found for method " + parseSaslAuthBytes.getAuthMethod());
                completableFuture.complete(abstractRequest.getErrorResponse(authenticationException2));
                throw authenticationException2;
            }
            try {
                this.authState = authenticationProvider.newAuthState(AuthData.of(parseSaslAuthBytes.getAuthData().getBytes(StandardCharsets.UTF_8)), (SocketAddress) null, (SSLSession) null);
                this.authDataSource = this.authState.getAuthDataSource();
                this.authRole = this.authState.getAuthRole();
                String username = parseSaslAuthBytes.getUsername();
                try {
                    if (this.admin.namespaces().getPermissions(username).containsKey(this.authRole)) {
                        completableFuture.complete(new SaslAuthenticateResponse(Errors.NONE, OffsetMetadata.NO_METADATA));
                        setState(State.COMPLETE);
                    } else {
                        completableFuture.complete(new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, "Role: " + this.authRole + " is not allowed on namespace " + username));
                    }
                } catch (PulsarAdminException e) {
                    AuthenticationException authenticationException3 = new AuthenticationException("Failed to get permissions of " + username + ": " + e.getMessage());
                    completableFuture.complete(new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, authenticationException3.getMessage()));
                    completableFuture.complete(abstractRequest.getErrorResponse(authenticationException3));
                }
            } catch (AuthenticationException e2) {
                completableFuture.complete(new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e2.getMessage()));
            }
        } catch (IOException e3) {
            completableFuture.complete(new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e3.getMessage()));
        }
    }

    private void handleApiVersionsRequest(ApiVersionsRequest apiVersionsRequest, CompletableFuture<AbstractResponse> completableFuture) throws AuthenticationException {
        if (this.state != State.HANDSHAKE_OR_VERSIONS_REQUEST) {
            throw new IllegalStateException("Receive ApiVersions request", this.state, State.HANDSHAKE_OR_VERSIONS_REQUEST);
        }
        if (apiVersionsRequest.hasUnsupportedRequestVersion()) {
            completableFuture.complete(apiVersionsRequest.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception()));
        } else {
            completableFuture.complete(ApiVersionsResponse.defaultApiVersionsResponse());
            setState(State.HANDSHAKE_REQUEST);
        }
    }

    private void handleHandshakeRequest(SaslHandshakeRequest saslHandshakeRequest, CompletableFuture<AbstractResponse> completableFuture) throws AuthenticationException {
        String mechanism = saslHandshakeRequest.mechanism();
        if (mechanism != null && !mechanism.equals("PLAIN")) {
            AuthenticationException authenticationException = new AuthenticationException("KoP only support PLAIN mechanism");
            completableFuture.complete(saslHandshakeRequest.getErrorResponse(authenticationException));
            throw authenticationException;
        }
        if (!this.allowedMechanisms.contains(mechanism)) {
            if (log.isDebugEnabled()) {
                log.debug("SASL mechanism '{}' requested by client is not supported", mechanism);
            }
            completableFuture.complete(new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, this.allowedMechanisms));
            throw new UnsupportedSaslMechanismException(mechanism);
        }
        if (log.isDebugEnabled()) {
            log.debug("Using SASL mechanism '{}' provided by client", mechanism);
        }
        completableFuture.complete(new SaslHandshakeResponse(Errors.NONE, this.allowedMechanisms));
        setState(State.AUTHENTICATE);
    }

    public AuthenticationState getAuthState() {
        return this.authState;
    }

    public AuthenticationDataSource getAuthDataSource() {
        return this.authDataSource;
    }

    public String getAuthRole() {
        return this.authRole;
    }
}
