package io.confluent.http.server.integration;

import io.confluent.http.server.KafkaHttpServerConfig;
import io.confluent.http.server.KafkaHttpServerImpl;
import io.confluent.http.server.integration.fixtures.FooBarApplication;
import io.confluent.kafka.http.server.KafkaHttpServer;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:io/confluent/http/server/integration/KafkaHttpServerImplIntegrationTest.class */
public class KafkaHttpServerImplIntegrationTest {
    static final /* synthetic */ boolean $assertionsDisabled;

    @Test
    public void serverEnabled_httpServer_runsServerAndApplication_onHttpServerListener() throws Exception {
        String str = "http://localhost:" + findUnusedPort();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", str);
        embeddedKafkaCluster.startBrokers(1, properties);
        ((KafkaHttpServer) ((KafkaServer) embeddedKafkaCluster.brokers().get(0)).httpServer().get()).awaitStarted();
        Assert.assertEquals("foobar", (String) ClientBuilder.newClient().target(str).path("foobar").request().accept(new String[]{"text/plain"}).get().readEntity(String.class));
        embeddedKafkaCluster.shutdown();
    }

    @Test
    public void serverEnabled_getDynamicPort() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("listeners", "http://localhost:0");
        KafkaHttpServerImpl kafkaHttpServerImpl = new KafkaHttpServerImpl(Collections.singletonList(new FooBarApplication()), new KafkaHttpServerConfig(hashMap));
        kafkaHttpServerImpl.start();
        kafkaHttpServerImpl.awaitStarted();
        Assert.assertNotEquals(0L, kafkaHttpServerImpl.getPrimaryPort());
        kafkaHttpServerImpl.stop();
        kafkaHttpServerImpl.awaitStopped();
    }

    @Test
    public void serverEnabled_httpServer_shutdownServerAndApplication() throws Exception {
        String str = "http://localhost:" + findUnusedPort();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", str);
        embeddedKafkaCluster.startBrokers(1, properties);
        Assert.assertFalse(FooBarApplication.wasShutdown());
        embeddedKafkaCluster.shutdown();
        Assert.assertTrue(FooBarApplication.wasShutdown());
    }

    @Test
    public void serverEnabled_metadataServer_runsServerAndApplication_onMetadataServerListener() throws Exception {
        String str = "http://localhost:" + findUnusedPort();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.metadata.server.listeners", str);
        embeddedKafkaCluster.startBrokers(1, properties);
        ((KafkaHttpServerImpl) ((KafkaServer) embeddedKafkaCluster.brokers().get(0)).httpServer().get()).awaitStarted();
        Assert.assertEquals("foobar", (String) ClientBuilder.newClient().target(str).path("foobar").request().accept(new String[]{"text/plain"}).get().readEntity(String.class));
        embeddedKafkaCluster.shutdown();
    }

    @Test
    public void serverEnabled_httpServerAndMetadataServer_runsServerAndApplication_onMetadataServerListener() throws Exception {
        String str = "http://localhost:" + findUnusedPort();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", str);
        properties.put("confluent.metadata.server.listeners", str);
        embeddedKafkaCluster.startBrokers(1, properties);
        ((KafkaHttpServer) ((KafkaServer) embeddedKafkaCluster.brokers().get(0)).httpServer().get()).awaitStarted();
        Assert.assertEquals("foobar", (String) ClientBuilder.newClient().target(str).path("foobar").request().accept(new String[]{"text/plain"}).get().readEntity(String.class));
        embeddedKafkaCluster.shutdown();
    }

    @Test
    public void serverDisabled_doesNotRunServer() throws Exception {
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", "");
        properties.put("confluent.metadata.server.listeners", "");
        embeddedKafkaCluster.startBrokers(1, properties);
        Assert.assertFalse(((KafkaServer) embeddedKafkaCluster.brokers().get(0)).httpServer().isDefined());
        embeddedKafkaCluster.shutdown();
    }

    @Test
    public void serverEnabled_applicationDisabled_doesNotRunServer() throws Exception {
        String str = "http://localhost:" + findUnusedPort();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", str);
        properties.put("foo.bar.enabled", "false");
        embeddedKafkaCluster.startBrokers(1, properties);
        Assert.assertFalse(((KafkaServer) embeddedKafkaCluster.brokers().get(0)).httpServer().isDefined());
        embeddedKafkaCluster.shutdown();
    }

    @Test
    public void serverEnabled_applicationException_stopBroker() throws IOException {
        String str = "http://localhost:" + findUnusedPort();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", str);
        properties.put("exceptional.enabled", "true");
        try {
            embeddedKafkaCluster.startBrokers(1, properties);
        } catch (Throwable th) {
            if (!$assertionsDisabled && !(th instanceof ConfigException)) {
                throw new AssertionError();
            }
            Assert.assertEquals("Failed on resource configuration", th.getMessage());
        }
        embeddedKafkaCluster.shutdown();
    }

    @Test
    public void serverEnabled_exposes_port() throws Exception {
        int findUnusedPort = findUnusedPort();
        String str = "http://localhost:" + findUnusedPort;
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", str);
        properties.put("foo.bar.enabled", "true");
        embeddedKafkaCluster.startBrokers(1, properties);
        Assert.assertEquals(findUnusedPort, ((KafkaHttpServerImpl) ((KafkaServer) embeddedKafkaCluster.brokers().get(0)).httpServer().get()).getPrimaryPort());
        embeddedKafkaCluster.shutdown();
    }

    @Test
    public void serverEnabled_ReconfigurableApp() throws Exception {
        HashMap hashMap = new HashMap();
        String str = "http://localhost:" + findUnusedPort();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startZooKeeper();
        Properties properties = new Properties();
        properties.put("confluent.http.server.listeners", str);
        properties.put("foo.bar.enabled", "true");
        properties.put("test.context", hashMap);
        embeddedKafkaCluster.startBrokers(1, properties);
        ((KafkaHttpServerImpl) ((KafkaServer) embeddedKafkaCluster.brokers().get(0)).httpServer().get()).awaitStarted();
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", embeddedKafkaCluster.bootstrapServers());
        AdminClient create = AdminClient.create(properties2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("config-1", "value1"), AlterConfigOp.OpType.SET)));
        create.incrementalAlterConfigs(hashMap2, new AlterConfigsOptions()).all().get();
        WebTarget target = ClientBuilder.newClient().target(str);
        Assert.assertTrue(((Boolean) target.path("reconfigure/configs").request().get().readEntity(Boolean.TYPE)).booleanValue());
        Assert.assertTrue(((Boolean) target.path("reconfigure/validate").request().get().readEntity(Boolean.TYPE)).booleanValue());
        Assert.assertTrue(((Boolean) target.path("reconfigure/reconfigure").request().get().readEntity(Boolean.TYPE)).booleanValue());
        embeddedKafkaCluster.shutdown();
    }

    private static int findUnusedPort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            int localPort = serverSocket.getLocalPort();
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
            }
            return localPort;
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    static {
        $assertionsDisabled = !KafkaHttpServerImplIntegrationTest.class.desiredAssertionStatus();
    }
}
