package net.mguenther.kafka.junit;

import java.beans.ConstructorProperties;
import java.util.List;
import net.mguenther.kafka.junit.provider.DefaultRecordConsumer;
import net.mguenther.kafka.junit.provider.DefaultRecordProducer;
import net.mguenther.kafka.junit.provider.DefaultTopicManager;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/mguenther/kafka/junit/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster extends ExternalResource implements EmbeddedLifecycle, RecordProducer, RecordConsumer, TopicManager, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private final EmbeddedKafkaClusterConfig config;
    private EmbeddedZooKeeper zooKeeper;
    private EmbeddedKafka broker;
    private EmbeddedConnect connect;
    private RecordProducer producerDelegate;
    private RecordConsumer consumerDelegate;
    private TopicManager topicManagerDelegate;

    protected void before() throws Throwable {
        start();
    }

    protected void after() {
        stop();
    }

    @Override // net.mguenther.kafka.junit.EmbeddedLifecycle
    public void start() {
        try {
            this.zooKeeper = new EmbeddedZooKeeper(this.config.getZooKeeperConfig());
            this.zooKeeper.start();
            this.broker = new EmbeddedKafka(this.config.getKafkaConfig(), this.zooKeeper.getConnectString());
            this.broker.start();
            if (this.config.usesConnect()) {
                this.connect = new EmbeddedConnect(this.config.getConnectConfig(), getBrokerList());
                this.connect.start();
            }
            this.producerDelegate = new DefaultRecordProducer(getBrokerList());
            this.consumerDelegate = new DefaultRecordConsumer(getBrokerList());
            this.topicManagerDelegate = new DefaultTopicManager(this.zooKeeper.getConnectString());
        } catch (Exception e) {
            throw new RuntimeException("Unable to start the embedded Kafka cluster.", e);
        }
    }

    @Override // net.mguenther.kafka.junit.EmbeddedLifecycle
    public void stop() {
        if (this.connect != null) {
            this.connect.stop();
        }
        this.broker.stop();
        this.zooKeeper.stop();
    }

    public String getBrokerList() {
        return this.broker.getBrokerList();
    }

    public static EmbeddedKafkaCluster provisionWith(EmbeddedKafkaClusterConfig embeddedKafkaClusterConfig) {
        return new EmbeddedKafkaCluster(embeddedKafkaClusterConfig);
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <K, V> List<RecordMetadata> send(SendKeyValues<K, V> sendKeyValues) throws InterruptedException {
        return this.producerDelegate.send(sendKeyValues);
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <K, V> List<RecordMetadata> send(SendKeyValuesTransactional<K, V> sendKeyValuesTransactional) throws InterruptedException {
        return this.producerDelegate.send(sendKeyValuesTransactional);
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <V> List<RecordMetadata> send(SendValues<V> sendValues) throws InterruptedException {
        return this.producerDelegate.send(sendValues);
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <V> List<RecordMetadata> send(SendValuesTransactional<V> sendValuesTransactional) throws InterruptedException {
        return this.producerDelegate.send(sendValuesTransactional);
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> readValues(ReadKeyValues<String, V> readKeyValues) {
        return this.consumerDelegate.readValues(readKeyValues);
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readKeyValues) {
        return this.consumerDelegate.read(readKeyValues);
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> observeValues(ObserveKeyValues<String, V> observeKeyValues) throws InterruptedException {
        return this.consumerDelegate.observeValues(observeKeyValues);
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeKeyValues) throws InterruptedException {
        return this.consumerDelegate.observe(observeKeyValues);
    }

    @Override // net.mguenther.kafka.junit.TopicManager
    public void createTopic(TopicConfig topicConfig) {
        this.topicManagerDelegate.createTopic(topicConfig);
    }

    @Override // net.mguenther.kafka.junit.TopicManager
    public void deleteTopic(String str) {
        this.topicManagerDelegate.deleteTopic(str);
    }

    @Override // net.mguenther.kafka.junit.TopicManager
    public boolean exists(String str) {
        return this.topicManagerDelegate.exists(str);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stop();
    }

    @ConstructorProperties({"config"})
    public EmbeddedKafkaCluster(EmbeddedKafkaClusterConfig embeddedKafkaClusterConfig) {
        this.config = embeddedKafkaClusterConfig;
    }
}
