package io.confluent.kafkarest.integration;

import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafkarest.CloudKafkaRestResourceExtension;
import io.confluent.kafkarest.entities.v3.GetBrokerResponse;
import io.confluent.kafkarest.entities.v3.GetTopicResponse;
import io.confluent.kafkarest.entities.v3.ListBrokersResponse;
import io.confluent.kafkarest.entities.v3.ListTopicsResponse;
import java.io.File;
import java.io.FileWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.Response;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.CertStores;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:io/confluent/kafkarest/integration/CloudApiKeyIntegrationTest.class */
public class CloudApiKeyIntegrationTest extends ClusterTestHarness {
    private static final String LKC = Utils.LC_META_ABC.logicalClusterId();
    private static final String API_KEY = "api_key";
    private static final String API_SECRET = "api_secret";

    @Rule
    public final TemporaryFolder tempFolder;
    private File apiKeys;
    private CertStores serverCertStores;
    private int brokerPort;

    public CloudApiKeyIntegrationTest() {
        super(1, false);
        this.tempFolder = new TemporaryFolder();
    }

    @Before
    public void setUp() throws Exception {
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, Paths.get(this.tempFolder.getRoot().getCanonicalPath(), new String[0]));
        this.serverCertStores = new CertStores(true, "localhost");
        this.brokerPort = choosePorts(1)[0];
        this.apiKeys = this.tempFolder.newFile();
        FileWriter fileWriter = new FileWriter(this.apiKeys);
        fileWriter.write("{  \"keys\": {    \"api_key\": {      \"sasl_mechanism\": \"PLAIN\",      \"hashed_secret\": \"api_secret\",      \"hash_function\": \"none\",      \"user_id\": \"1\",      \"logical_cluster_id\": \"" + LKC + "\",      \"service_account\": \"false\"    }  }}");
        fileWriter.flush();
        fileWriter.close();
        super.setUp();
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        properties.put("multitenant.metadata.dir", this.tempFolder.getRoot().getAbsolutePath());
        properties.put("multitenant.metadata.class", PhysicalClusterMetadata.class.getName());
        properties.put("listeners", String.format("PLAINTEXT://localhost:0, SASL_SSL://localhost:%d", Integer.valueOf(this.brokerPort)));
        properties.put("listener.name.sasl_ssl.sasl.enabled.mechanisms", "PLAIN");
        properties.put("listener.name.sasl_ssl.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        properties.put("listener.name.sasl_ssl.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.apiKeys.getPath() + "\" refresh_ms=\"1000\";");
        Map map = (Map) this.serverCertStores.keyStoreProps().entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        Map map2 = (Map) this.serverCertStores.trustStoreProps().entrySet().stream().filter(entry2 -> {
            return entry2.getValue() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        properties.put(KafkaConfig.FailedAuthenticationDelayMsProp(), 0);
        properties.putAll(map);
        properties.putAll(map2);
        return properties;
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", CloudKafkaRestResourceExtension.class.getName());
        properties.put("bootstrap.servers", String.format("SASL_SSL://127.0.0.1:%d", Integer.valueOf(this.brokerPort)));
        Map trustingConfig = this.serverCertStores.getTrustingConfig(this.serverCertStores);
        properties.put("client.ssl.truststore.location", trustingConfig.get("ssl.truststore.location"));
        properties.put("client.ssl.endpoint.identification.algorithm", "");
        properties.put("client.ssl.truststore.password", ((Password) trustingConfig.get("ssl.truststore.password")).value());
    }

    private Invocation.Builder authenticatedRequest(String str) {
        return authenticatedRequest(str, API_KEY, API_SECRET);
    }

    private Invocation.Builder authenticatedRequest(String str, String str2, String str3) {
        return request(str).header("Authorization", String.format("Basic %s", Base64.getEncoder().encodeToString((str2 + ":" + str3).getBytes(StandardCharsets.ISO_8859_1))));
    }

    @Test
    public void testNoCredentialsUnauthorized() {
        Assert.assertEquals(401L, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testEmptyCredentialsUnauthorized() {
        Assert.assertEquals(401L, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Authorization", "Basic ").get().getStatus());
    }

    @Test
    public void testEmptyAuthorizationHeaderUnauthorized() {
        Assert.assertEquals(401L, request(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).header("Authorization", "").get().getStatus());
    }

    @Test
    public void testListBrokersOk() {
        List list = (List) getBrokers().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).get();
        Assert.assertEquals(200L, response.getStatus());
        Assert.assertEquals(list, (List) ((ListBrokersResponse) response.readEntity(ListBrokersResponse.class)).getValue().getData().stream().map((v0) -> {
            return v0.getBrokerId();
        }).collect(Collectors.toList()));
    }

    @Test
    public void testListBrokersUnauthorizedUser() {
        Assert.assertEquals(401L, authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), "wrong_api", "wrong_secret").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testWrongApiKey() {
        Assert.assertEquals(401L, authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC), "wrong_key", "wrong_secret").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testListBrokersUnauthorizedLKC() {
        Assert.assertEquals(200L, authenticatedRequest(String.format("/v3/clusters/%s/brokers", "LKC-nope")).accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testGetBrokerOk() {
        Integer valueOf = Integer.valueOf(((Node) getBrokers().get(0)).id());
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/brokers/%d", LKC, valueOf)).accept(new String[]{"application/json"}).get();
        Assert.assertEquals(200L, response.getStatus());
        Assert.assertEquals(valueOf, Integer.valueOf(((GetBrokerResponse) response.readEntity(GetBrokerResponse.class)).getValue().getBrokerId()));
    }

    @Test
    public void testGetBrokerWrongKey() {
        Assert.assertEquals(401L, authenticatedRequest(String.format("/v3/clusters/%s/brokers/%d", LKC, Integer.valueOf(((Node) getBrokers().get(0)).id())), "wrong_key", "wrong_secret").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testListTopicsOk() {
        createTopic("test_listTopics", 1, (short) 1);
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/topics", LKC)).accept(new String[]{"application/json"}).get();
        Assert.assertEquals(200L, response.getStatus());
        Assert.assertTrue(((List) ((ListTopicsResponse) response.readEntity(ListTopicsResponse.class)).getValue().getData().stream().map((v0) -> {
            return v0.getTopicName();
        }).collect(Collectors.toList())).contains("test_listTopics"));
    }

    @Test
    public void testListTopicsWrongKey() {
        createTopic("test_listTopics", 1, (short) 1);
        Assert.assertEquals(401L, authenticatedRequest(String.format("/v3/clusters/%s/topics", LKC), "wrong_key", "wrong_secret").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testGetTopicOk() {
        createTopic("test_getTopic", 1, (short) 1);
        Response response = authenticatedRequest(String.format("/v3/clusters/%s/topics/%s", LKC, "test_getTopic")).accept(new String[]{"application/json"}).get();
        Assert.assertEquals(200L, response.getStatus());
        Assert.assertEquals("test_getTopic", ((GetTopicResponse) response.readEntity(GetTopicResponse.class)).getValue().getTopicName());
    }

    @Test
    public void testGetTopicUnauthorized() {
        createTopic("test_getTopic", 1, (short) 1);
        Assert.assertEquals(401L, authenticatedRequest(String.format("/v3/clusters/%s/topics/%s", LKC, "test_getTopic"), "wrong_api", "wrong_secret").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void testKrest3592Mitigation_nonAuthenticatedEmptyProduceReturnsUnauthenticated() {
        createTopic("foobar", 1, (short) 1);
        Assert.assertEquals(401L, authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC), "wrong_api", "wrong_secret").accept(new String[]{"application/json"}).post(Entity.entity("{", "application/json")).getStatus());
    }

    @Test
    public void testKrest3592Mitigation_authenticatedEmptyProduceReturnsOk() {
        createTopic("foobar", 1, (short) 1);
        Assert.assertEquals(200L, authenticatedRequest(String.format("/v3/clusters/%s/topics/foobar/records", LKC)).accept(new String[]{"application/json"}).post(Entity.entity("{", "application/json")).getStatus());
    }
}
