package io.confluent.kafkarest.integration;

import com.fasterxml.jackson.databind.node.TextNode;
import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthBearerServerLoginCallbackHandler;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthBearerValidatorCallbackHandler;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafkarest.CloudKafkaRestResourceExtension;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v3.GetBrokerResponse;
import io.confluent.kafkarest.entities.v3.GetTopicResponse;
import io.confluent.kafkarest.entities.v3.ListBrokersResponse;
import io.confluent.kafkarest.entities.v3.ListTopicsResponse;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.testing.JwtProvider;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.Response;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.network.CertStores;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.jdk.javaapi.CollectionConverters;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/CloudOauthBearerIdpIntegrationTest.class */
public class CloudOauthBearerIdpIntegrationTest extends CloudClusterTestHarness {
    private static final String LKC_METADATA_TOPIC = "_confluent-logical_clusters";
    private static final int EXPIRATION = 100000;
    private static final long PRODUCER_CACHE_VALIDITY_MS = 3000;
    private static final int PRODUCER_CACHE_CAPACITY = 2;
    private JwtProvider jwtProvider;
    private CertStores serverCertStores;
    private int[] tokenPorts;
    private String authorizedID;
    private String unauthorizedID;
    private String brokerUuid;
    private static final String LKC = Utils.LC_META_ABC.logicalClusterId();
    private static final String ORG_ID = Utils.LC_META_ABC.organizationId();
    private static final String key = "foo";
    private static final String value = "bar";
    private static final ProduceRequest produceRequest = ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf(key)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf(value)).build()).setOriginalSize(0).build();

    public CloudOauthBearerIdpIntegrationTest() {
        super(1, false);
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.brokerUuid = UUID.randomUUID().toString();
        this.jwtProvider = new JwtProvider();
        this.serverCertStores = new CertStores(true, "localhost");
        this.authorizedID = newIDToken(EXPIRATION, ORG_ID, LKC);
        this.unauthorizedID = newIDToken(EXPIRATION, "ORG-INCORRECT", "LKC-INCORRECT");
        this.tokenPorts = choosePorts(1);
        super.setUp(testInfo);
        loadLkcMetadata();
    }

    private void loadLkcMetadata() throws InterruptedException {
        String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
        RecordHeaders createGoodSequenceIdRecordHeaders = KafkaTestUtils.createGoodSequenceIdRecordHeaders(1L);
        TestUtils.produceMessages(CollectionConverters.asScala(this.servers).toSeq(), CollectionConverters.asScala(Collections.singletonList(new ProducerRecord(LKC_METADATA_TOPIC, 0, Utils.LC_META_ABC.logicalClusterId().getBytes(), Utils.protoFromMetadata(Utils.LC_META_ABC).toByteArray(), createGoodSequenceIdRecordHeaders))).toSeq(), -1);
        BasePhysicalClusterMetadata basePhysicalClusterMetadata = BasePhysicalClusterMetadata.getInstance(this.brokerUuid);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
        }, 30000L, "Expected metadata to get consumed");
    }

    private String newIDToken(int i, String str, String str2) throws Exception {
        return this.jwtProvider.jws(Integer.valueOf(i), "Confluent", "Confluent", str, str2).getJwsToken();
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        properties.put("broker.session.uuid", this.brokerUuid);
        properties.put("confluent.cdc.lkc.metadata.topic", LKC_METADATA_TOPIC);
        properties.put("multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName());
        properties.put("auto.create.topics.enable", true);
        properties.put("listeners", String.format("PLAINTEXT://localhost:0, SASL_SSL://localhost:%d", Integer.valueOf(this.tokenPorts[i])));
        properties.put("listener.name.sasl_ssl.sasl.enabled.mechanisms", "OAUTHBEARER");
        properties.put("listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class", OAuthBearerServerLoginCallbackHandler.class.getName());
        properties.put("listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class", OAuthBearerValidatorCallbackHandler.class.getName());
        properties.put("listener.name.sasl_ssl.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + this.jwtProvider.publicFile().toPath() + "\";");
        Map map = (Map) this.serverCertStores.keyStoreProps().entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        Map map2 = (Map) this.serverCertStores.trustStoreProps().entrySet().stream().filter(entry2 -> {
            return entry2.getValue() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        properties.put(KafkaConfig.FailedAuthenticationDelayMsProp(), 0);
        properties.putAll(map);
        properties.putAll(map2);
        return properties;
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", CloudKafkaRestResourceExtension.class.getName());
        properties.put("bootstrap.servers", String.format("SASL_SSL://127.0.0.1:%d", Integer.valueOf(this.tokenPorts[0])));
        Map trustingConfig = this.serverCertStores.getTrustingConfig(this.serverCertStores);
        properties.put("client.ssl.truststore.location", trustingConfig.get("ssl.truststore.location"));
        properties.put("client.ssl.endpoint.identification.algorithm", "");
        properties.put("client.ssl.truststore.password", ((Password) trustingConfig.get("ssl.truststore.password")).value());
        properties.put("kafka.producer.cache.enable", true);
        properties.put("kafka.producer.cache.validity.ms", Long.valueOf(PRODUCER_CACHE_VALIDITY_MS));
        properties.put("kafka.producer.cache.capacity", Integer.valueOf(PRODUCER_CACHE_CAPACITY));
    }

    protected Invocation.Builder authenticatedRequest(String str, String str2) {
        return request(str).header("Authorization", String.format("Bearer %s", str2));
    }

    @Test
    public void testNoCredentialsUnauthorized() {
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testEmptyCredentialsUnauthorized() {
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Authorization", "Bearer ").header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testEmptyAuthorizationHeaderUnauthorized() {
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Authorization", "").header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testListBrokersOk() {
        List list = (List) getBrokers().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals(list, (List) ((ListBrokersResponse) response.readEntity(ListBrokersResponse.class)).getValue().getData().stream().map((v0) -> {
            return v0.getBrokerId();
        }).collect(Collectors.toList()));
    }

    @Test
    public void testListBrokersUnauthorizedUser() {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), this.unauthorizedID).header("Confluent-Identity-Pool-Id", "poolId").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testNewSubjectExpiredToken() throws Exception {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), newIDToken(-100, ORG_ID, LKC)).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testListBrokersUnauthorizedLKC() {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers", "LKC-nope"), this.unauthorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testGetBrokerOk() {
        Integer valueOf = Integer.valueOf(((Node) getBrokers().get(0)).id());
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/brokers/%d", LKC, valueOf), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals(valueOf, Integer.valueOf(((GetBrokerResponse) response.readEntity(GetBrokerResponse.class)).getValue().getBrokerId()));
    }

    @Test
    public void testGetBrokerUnauthorized() {
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/brokers/%d", LKC, Integer.valueOf(((Node) getBrokers().get(0)).id())), this.unauthorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testGetTopicOk() {
        createTopic("test_getTopic", 1, (short) 1);
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/topics/%s", LKC, "test_getTopic"), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals("test_getTopic", ((GetTopicResponse) response.readEntity(GetTopicResponse.class)).getValue().getTopicName());
    }

    @Test
    public void testGetTopicUnauthorized() {
        createTopic("test_getTopic", 1, (short) 1);
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/topics/%s", LKC, "test_getTopic"), this.unauthorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get().getStatus());
    }

    @Test
    public void testListTopicsOk() {
        createTopic("test_listTopics", 1, (short) 1);
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/topics", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").get();
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertEquals(200, response.getStatus());
        Assertions.assertTrue(((List) ((ListTopicsResponse) response.readEntity(ListTopicsResponse.class)).getValue().getData().stream().map((v0) -> {
            return v0.getTopicName();
        }).collect(Collectors.toList())).contains("test_listTopics"));
    }

    @Test
    public void testListTopicsUnauthorized() {
        createTopic("test_listTopics", 1, (short) 1);
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/topics", LKC), this.unauthorizedID).header("Confluent-Identity-Pool-Id", "poolId").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testProduce_EmptyCredentialsUnauthorized() {
        createTopic("foobar", 1, (short) 1);
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/topics/foobar/records", LKC)).accept(new String[]{"application/json"}).header("Authorization", "Bearer ").header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json")).getStatus());
    }

    @Test
    public void testProduce_EmptyAuthorizationHeaderUnauthorized() {
        createTopic("foobar", 1, (short) 1);
        Assertions.assertEquals(401, request(String.format("/v3/clusters/%s/topics/foobar/records", LKC)).accept(new String[]{"application/json"}).header("Authorization", "").header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json")).getStatus());
    }

    @Test
    public void testProduce_ProduceOk() {
        createTopic("foobar", 1, (short) 1);
        Assertions.assertEquals(200, authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json")).getStatus());
    }

    @Test
    public void testProduce_UnauthorizedUser() {
        createTopic("foobar", 1, (short) 1);
        Assertions.assertEquals(401, authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), this.unauthorizedID).header("Confluent-Identity-Pool-Id", "poolId").accept(new String[]{"application/json"}).post(Entity.entity(produceRequest, "application/json")).getStatus());
    }

    @Test
    public void testSecondProduceUsesCache() throws Exception {
        createTopic("foobar", 1, (short) 1);
        long nanoTime = System.nanoTime();
        Response post = authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json"));
        double nanoTime2 = (System.nanoTime() - nanoTime) / 1000000.0d;
        System.out.println("request 1 time ms: " + nanoTime2);
        Assertions.assertEquals(200, post.getStatus());
        long nanoTime3 = System.nanoTime();
        Response post2 = authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json"));
        double nanoTime4 = (System.nanoTime() - nanoTime3) / 1000000.0d;
        System.out.println("request 2 time ms: " + nanoTime4);
        Assertions.assertEquals(200, post2.getStatus());
        Assertions.assertTrue(nanoTime4 < nanoTime2);
    }

    @Test
    public void testMoreProducersThanCacheCapacity() throws Exception {
        createTopic("foobar", 1, (short) 1);
        String newIDToken = newIDToken(EXPIRATION, ORG_ID, LKC);
        String newIDToken2 = newIDToken(EXPIRATION, ORG_ID, LKC);
        long nanoTime = System.nanoTime();
        Response post = authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), this.authorizedID).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json"));
        System.out.println("request 1 time ms: " + ((System.nanoTime() - nanoTime) / 1000000.0d));
        Assertions.assertEquals(200, post.getStatus());
        long nanoTime2 = System.nanoTime();
        Response post2 = authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), newIDToken).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json"));
        System.out.println("request 2 time ms: " + ((System.nanoTime() - nanoTime2) / 1000000.0d));
        Assertions.assertEquals(200, post2.getStatus());
        long nanoTime3 = System.nanoTime();
        Response post3 = authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), newIDToken2).accept(new String[]{"application/json"}).header("Confluent-Identity-Pool-Id", "poolId").post(Entity.entity(produceRequest, "application/json"));
        System.out.println("request 3 time ms: " + ((System.nanoTime() - nanoTime3) / 1000000.0d));
        Assertions.assertEquals(200, post3.getStatus());
    }
}
