package io.confluent.kafkarest.testing;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/kafkarest/testing/EmbeddedKafka.class */
public class EmbeddedKafka {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private final File logDir;
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    private final KafkaServer kafka;

    public EmbeddedKafka(Properties properties, MockTime mockTime) throws IOException {
        this.tmpFolder.create();
        this.logDir = this.tmpFolder.newFolder();
        KafkaConfig kafkaConfig = new KafkaConfig(brokerConfigs(properties), true);
        log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", this.logDir, kafkaConfig.zkConnect());
        this.kafka = TestUtils.createServer(kafkaConfig, mockTime);
        log.debug("Startup of embedded Kafka broker completed: {}", this);
    }

    private Properties brokerConfigs(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put(KafkaConfig$.MODULE$.BrokerIdProp(), -1);
        properties2.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
        properties2.put(KafkaConfig$.MODULE$.PortProp(), 0);
        properties2.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
        properties2.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
        properties2.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
        properties2.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
        properties2.putAll(properties);
        properties2.setProperty(KafkaConfig$.MODULE$.LogDirProp(), this.logDir.getAbsolutePath());
        return properties2;
    }

    public Integer brokerId() {
        return Integer.valueOf(this.kafka.config().brokerId());
    }

    public String clusterID() {
        return this.kafka.clusterId();
    }

    public String brokerConnect(String str) {
        return this.kafka.config().hostName() + ":" + this.kafka.boundPort(new ListenerName(str));
    }

    public String zkConnect() {
        return this.kafka.config().zkConnect();
    }

    public void shutdown() {
        log.debug("Shutting down embedded Kafka broker {} ...", this);
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        log.debug("Removing logs.dir at {} ...", this.logDir);
        CoreUtils.delete(JavaConverters.asScalaBuffer(Collections.singletonList(this.logDir.getAbsolutePath())));
        this.tmpFolder.delete();
        log.debug("Shutdown of embedded Kafka broker completed {}.", this);
    }

    public List<String> listeners() {
        return (List) JavaConverters.seqAsJavaList(this.kafka.config().listeners()).stream().map(endPoint -> {
            return endPoint.listenerName().value();
        }).collect(Collectors.toList());
    }

    public String toString() {
        return String.format("Kafka brokerId=%d, endpoints=%s, zkConnect=%s", Integer.valueOf(this.kafka.config().brokerId()), Utils.mkString((Map) listeners().stream().collect(Collectors.toMap(Function.identity(), this::brokerConnect)), "", "", ":", ","), zkConnect());
    }
}
