package net.mguenther.kafka.junit;

import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/mguenther/kafka/junit/EmbeddedConnect.class */
public class EmbeddedConnect implements EmbeddedLifecycle {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedConnect.class);
    private static final int REQUEST_TIMEOUT_MS = 120000;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final List<Properties> connectorConfigs;
    private final DistributedConfig config;
    private final KafkaOffsetBackingStore offsetBackingStore;
    private final Worker worker;
    private final StatusBackingStore statusBackingStore;
    private final ConfigBackingStore configBackingStore;
    private final DistributedHerder herder;

    public EmbeddedConnect(EmbeddedConnectConfig embeddedConnectConfig, String str) {
        Properties connectProperties = embeddedConnectConfig.getConnectProperties();
        connectProperties.put("bootstrap.servers", str);
        this.connectorConfigs = embeddedConnectConfig.getConnectors();
        this.config = new DistributedConfig(Utils.propsToStringMap(connectProperties));
        this.offsetBackingStore = new KafkaOffsetBackingStore();
        this.worker = new Worker(embeddedConnectConfig.getWorkerId(), Time.SYSTEM, new Plugins(new HashMap()), this.config, this.offsetBackingStore);
        this.statusBackingStore = new KafkaStatusBackingStore(Time.SYSTEM, this.worker.getInternalValueConverter());
        this.configBackingStore = new KafkaConfigBackingStore(this.worker.getInternalValueConverter(), this.config);
        this.herder = new DistributedHerder(this.config, Time.SYSTEM, this.worker, this.statusBackingStore, this.configBackingStore, "");
    }

    @Override // net.mguenther.kafka.junit.EmbeddedLifecycle
    public void start() {
        this.offsetBackingStore.configure(this.config);
        this.statusBackingStore.configure(this.config);
        try {
            log.info("Embedded Kafka Connect is starting.");
            this.worker.start();
            this.herder.start();
            log.info("Embedded Kafka Connect started.");
            log.info("Found {} connectors to deploy.", Integer.valueOf(this.connectorConfigs.size()));
            this.connectorConfigs.forEach(this::deployConnector);
        } catch (Exception e) {
            throw new RuntimeException("Unable to start Embedded Kafka Connect.", e);
        }
    }

    private void deployConnector(Properties properties) {
        FutureCallback futureCallback = new FutureCallback();
        String property = properties.getProperty("name");
        log.info("Deploying connector {}.", property);
        this.herder.putConnectorConfig(property, Utils.propsToStringMap(properties), true, futureCallback);
        try {
            futureCallback.get(120000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to deploy connector {}.", property, e);
        }
    }

    @Override // net.mguenther.kafka.junit.EmbeddedLifecycle
    public void stop() {
        try {
            if (!this.shutdown.getAndSet(true)) {
                log.info("Embedded Kafka Connect is stopping.");
                this.herder.stop();
                this.worker.stop();
                log.info("Embedded Kafka Connect stopped.");
            }
        } catch (Exception e) {
            throw new RuntimeException("Unable to stop Embedded Kafka Connect.", e);
        }
    }
}
