package io.confluent.kafkarest.integration;

import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthBearerServerLoginCallbackHandler;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthBearerValidatorCallbackHandler;
import io.confluent.kafkarest.CloudKafkaRestResourceExtension;
import io.confluent.kafkarest.entities.v3.GetBrokerResponse;
import io.confluent.kafkarest.entities.v3.GetTopicResponse;
import io.confluent.kafkarest.entities.v3.ListBrokersResponse;
import io.confluent.kafkarest.entities.v3.ListTopicsResponse;
import io.confluent.kafkarest.testing.JwtProvider;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.Response;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.CertStores;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:io/confluent/kafkarest/integration/CloudOauthBearerIdpIntegrationTest.class */
public class CloudOauthBearerIdpIntegrationTest extends ClusterTestHarness {
    private static final String LKC = Utils.LC_META_ABC.logicalClusterId();
    private static final int EXPIRATION = 100000;

    @TempDir
    Path tempFolder;
    private JwtProvider jwtProvider;
    private CertStores serverCertStores;
    private int[] tokenPorts;
    private String authorizedID;
    private String unauthorizedID;

    public CloudOauthBearerIdpIntegrationTest() {
        super(1, false);
    }

    @BeforeEach
    public void setUp() throws Exception {
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempFolder);
        this.jwtProvider = new JwtProvider();
        this.serverCertStores = new CertStores(true, "localhost");
        this.authorizedID = newIDToken(LKC, EXPIRATION);
        this.unauthorizedID = newIDToken("LKC-INCORRECT", EXPIRATION);
        this.tokenPorts = choosePorts(1);
        super.setUp();
    }

    private String newIDToken(String str, int i) throws Exception {
        return this.jwtProvider.jws(Integer.valueOf(i), "Confluent", "Confluent", new String[]{str}).getJwsToken();
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        properties.put("multitenant.metadata.dir", this.tempFolder.toAbsolutePath().toString());
        properties.put("multitenant.metadata.class", PhysicalClusterMetadata.class.getName());
        properties.put("listeners", String.format("PLAINTEXT://localhost:0, SASL_SSL://localhost:%d", Integer.valueOf(this.tokenPorts[i])));
        properties.put("listener.name.sasl_ssl.sasl.enabled.mechanisms", "OAUTHBEARER");
        properties.put("listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class", OAuthBearerServerLoginCallbackHandler.class.getName());
        properties.put("listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class", OAuthBearerValidatorCallbackHandler.class.getName());
        properties.put("listener.name.sasl_ssl.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + this.jwtProvider.publicFile().toPath() + "\";");
        Map map = (Map) this.serverCertStores.keyStoreProps().entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        Map map2 = (Map) this.serverCertStores.trustStoreProps().entrySet().stream().filter(entry2 -> {
            return entry2.getValue() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        properties.put(KafkaConfig.FailedAuthenticationDelayMsProp(), 0);
        properties.putAll(map);
        properties.putAll(map2);
        return properties;
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", CloudKafkaRestResourceExtension.class.getName());
        properties.put("bootstrap.servers", String.format("SASL_SSL://127.0.0.1:%d", Integer.valueOf(this.tokenPorts[0])));
        Map trustingConfig = this.serverCertStores.getTrustingConfig(this.serverCertStores);
        properties.put("client.ssl.truststore.location", trustingConfig.get("ssl.truststore.location"));
        properties.put("client.ssl.endpoint.identification.algorithm", "");
        properties.put("client.ssl.truststore.password", ((Password) trustingConfig.get("ssl.truststore.password")).value());
    }

    protected Invocation.Builder authenticatedRequest(String str, String str2) {
        return request(str).header("Authorization", String.format("Bearer %s", str2));
    }

    @Test
    public void testNoCredentialsUnauthorized() {
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testEmptyCredentialsUnauthorized() {
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Authorization", "Bearer ").header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testEmptyAuthorizationHeaderUnauthorized() {
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Authorization", "").header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testListBrokersOk() {
        List list = (List) getBrokers().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals(list, (List) ((ListBrokersResponse) response.readEntity(ListBrokersResponse.class)).getValue().getData().stream().map((v0) -> {
            return v0.getBrokerId();
        }).collect(Collectors.toList()));
    }

    @Test
    public void testListBrokersUnauthorizedUser() {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), this.unauthorizedID).header("Confluent-Identity-Pool-Id", "poolId").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testNewSubjectExpiredToken() throws Exception {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), newIDToken(UUID.randomUUID().toString(), -100)).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testListBrokersUnauthorizedLKC() {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers", "LKC-nope"), this.unauthorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testGetBrokerOk() {
        Integer valueOf = Integer.valueOf(((Node) getBrokers().get(0)).id());
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/brokers/%d", LKC, valueOf), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals(valueOf, Integer.valueOf(((GetBrokerResponse) response.readEntity(GetBrokerResponse.class)).getValue().getBrokerId()));
    }

    @Test
    public void testGetBrokerUnauthorized() {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers/%d", LKC, Integer.valueOf(((Node) getBrokers().get(0)).id())), this.unauthorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testGetTopicOk() {
        createTopic("test_getTopic", 1, (short) 1);
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/topics/%s", LKC, "test_getTopic"), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals("test_getTopic", ((GetTopicResponse) response.readEntity(GetTopicResponse.class)).getValue().getTopicName());
    }

    @Test
    public void testGetTopicUnauthorized() {
        createTopic("test_getTopic", 1, (short) 1);
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/topics/%s", LKC, "test_getTopic"), this.unauthorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testListTopicsOk() {
        createTopic("test_listTopics", 1, (short) 1);
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/topics", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertTrue(((List) ((ListTopicsResponse) response.readEntity(ListTopicsResponse.class)).getValue().getData().stream().map((v0) -> {
            return v0.getTopicName();
        }).collect(Collectors.toList())).contains("test_listTopics"));
    }

    @Test
    public void testListTopicsUnauthorized() {
        createTopic("test_listTopics", 1, (short) 1);
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/topics", LKC), this.unauthorizedID).header("Confluent-Identity-Pool-Id", "poolId").accept(new String[]{"application/json"}).get().getStatus());
    }
}
