package io.confluent.kafkarest.testing;

import io.confluent.kafkarest.KafkaRestApplication;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.io.TempDir;
import scala.Option;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/testing/ClusterTestHarness.class */
public abstract class ClusterTestHarness {
    private static EmbeddedZookeeper zookeeper;
    private static List<EmbeddedKafka> brokers = null;
    private static WebTarget kafkaRest;

    @BeforeAll
    public static void setUp(@TempDir Path path) throws Exception {
        zookeeper = new EmbeddedZookeeper();
        String format = String.format("127.0.0.1:%d", Integer.valueOf(zookeeper.port()));
        Client newClient = ClientBuilder.newClient();
        new KafkaRestApplication().configureBaseApplication(newClient);
        kafkaRest = newClient.target("http://localhost:" + findUnusedPort());
        brokers = new Vector();
        for (int i = 0; i < getBrokerCount(); i++) {
            brokers.add(new EmbeddedKafka(getBrokerProperties(format), path));
        }
    }

    public static int getBrokerCount() {
        return 1;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String clusterId() {
        return brokers().get(0).clusterID();
    }

    private static Properties getBrokerProperties(String str) {
        Option apply = Option.apply((Object) null);
        Properties createBrokerConfig = TestUtils.createBrokerConfig(-1, str, false, false, TestUtils.RandomPort(), Option.apply((Object) null), apply, Option.empty(), true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.empty(), 1, false, 1, (short) 1, false);
        createBrokerConfig.setProperty("auto.create.topics.enable", "false");
        createBrokerConfig.setProperty("zookeeper.connect", str);
        createBrokerConfig.put("confluent.http.server.listeners", kafkaRest.getUri());
        return createBrokerConfig;
    }

    @AfterAll
    public static void tearDown() {
        Iterator<EmbeddedKafka> it = brokers.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        zookeeper.shutdown();
    }

    protected abstract String constructPath(Object... objArr);

    /* JADX INFO: Access modifiers changed from: protected */
    public List<EmbeddedKafka> brokers() {
        return brokers;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Invocation.Builder restRequest(Object... objArr) {
        return kafkaRest.path(constructPath(objArr)).request().accept(new String[]{"application/json"});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createTopic(String str) {
        createTopic(str, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createTopic(String str, int i) {
        createTopic(str, i, (short) brokers.size());
    }

    protected void createTopic(String str, int i, short s) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers().get(0).brokerConnect("PLAINTEXT"));
        try {
            AdminClient.create(properties).createTopics(Collections.singletonList(new NewTopic(str, i, s))).all().get();
        } catch (InterruptedException | ExecutionException e) {
            Assertions.fail(String.format("Failed to create topic: %s", e.getMessage()));
        }
    }

    protected static int findUnusedPort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            int localPort = serverSocket.getLocalPort();
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
            }
            return localPort;
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }
}
