package io.confluent.kafkarest.testing;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.cluster.EndPoint;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.QuorumTestHarness;
import kafka.utils.CoreUtils;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/kafkarest/testing/EmbeddedKafka.class */
public class EmbeddedKafka {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private static final String KAFKA_HOST = "127.0.0.1";
    Path logDir;
    private final KafkaBroker kafka;

    public EmbeddedKafka(Properties properties, Path path, QuorumTestHarness quorumTestHarness) throws IOException {
        this.logDir = path;
        KafkaConfig kafkaConfig = new KafkaConfig(brokerConfigs(properties), true);
        log.debug("Starting embedded Kafka broker (with log.dirs={}) ...", path);
        this.kafka = quorumTestHarness.createBroker(kafkaConfig, Time.SYSTEM, true, Option.empty());
        log.debug("Startup of embedded Kafka broker completed: {}", this);
    }

    private Properties brokerConfigs(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put(KafkaConfig.BrokerIdProp(), -1);
        properties2.put(KafkaConfig.NumPartitionsProp(), 1);
        properties2.put(KafkaConfig.AutoCreateTopicsEnableProp(), true);
        properties2.put(KafkaConfig.MessageMaxBytesProp(), 1000000);
        properties2.put(KafkaConfig.ControlledShutdownEnableProp(), true);
        properties2.put(KafkaConfig.ListenersProp(), String.format("%s://%s:%d", SecurityProtocol.PLAINTEXT, KAFKA_HOST, 0));
        properties2.putAll(properties);
        properties2.setProperty(KafkaConfig.LogDirProp(), this.logDir.toAbsolutePath().toString());
        return properties2;
    }

    public Integer brokerId() {
        return Integer.valueOf(this.kafka.config().brokerId());
    }

    public String clusterID() {
        return this.kafka.clusterId();
    }

    public String brokerConnect(String str) {
        Objects.requireNonNull(str);
        List<EndPoint> seqAsJavaList = JavaConverters.seqAsJavaList(this.kafka.advertisedListeners());
        for (EndPoint endPoint : seqAsJavaList) {
            if (str.equals(endPoint.listenerName().value())) {
                return String.format("%s://%s:%d", endPoint.securityProtocol(), endPoint.host(), Integer.valueOf(endPoint.port()));
            }
        }
        throw new IllegalArgumentException(String.format("Cannot find listener name %s in listeners %s", str, seqAsJavaList));
    }

    public void shutdown() {
        log.debug("Shutting down embedded Kafka broker {} ...", this);
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        log.debug("Removing logs.dir at {} ...", this.logDir);
        CoreUtils.delete(JavaConverters.asScalaBuffer(Collections.singletonList(this.logDir.toAbsolutePath().toString())));
        log.debug("Shutdown of embedded Kafka broker completed {}.", this);
    }

    public List<String> listeners() {
        return (List) JavaConverters.seqAsJavaList(this.kafka.config().listeners()).stream().map(endPoint -> {
            return endPoint.listenerName().value();
        }).collect(Collectors.toList());
    }

    public String toString() {
        return String.format("Kafka brokerId=%d, endpoints=%s", Integer.valueOf(this.kafka.config().brokerId()), Utils.mkString((Map) listeners().stream().collect(Collectors.toMap(Function.identity(), this::brokerConnect)), "", "", ":", ","));
    }
}
