package net.mguenther.kafka.junit;

import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode$Enforced$;
import kafka.common.TopicAlreadyMarkedForDeletionException;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/mguenther/kafka/junit/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster implements EmbeddedLifecycle, RecordProducer, RecordConsumer, TopicManager, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private final EmbeddedKafkaClusterConfig config;
    private EmbeddedZooKeeper zooKeeper;
    private EmbeddedKafka broker;
    private EmbeddedConnect connect;

    @Override // net.mguenther.kafka.junit.EmbeddedLifecycle
    public void start() {
        try {
            this.zooKeeper = new EmbeddedZooKeeper(this.config.getZooKeeperConfig());
            this.zooKeeper.start();
            this.broker = new EmbeddedKafka(this.config.getKafkaConfig(), this.zooKeeper.getConnectString());
            this.broker.start();
            if (this.config.usesConnect()) {
                this.connect = new EmbeddedConnect(this.config.getConnectConfig(), getBrokerList());
                this.connect.start();
            }
        } catch (Exception e) {
            throw new RuntimeException("Unable to start the embedded Kafka cluster.", e);
        }
    }

    @Override // net.mguenther.kafka.junit.EmbeddedLifecycle
    public void stop() {
        if (this.connect != null) {
            this.connect.stop();
        }
        this.broker.stop();
        this.zooKeeper.stop();
    }

    public String getBrokerList() {
        return this.broker.getBrokerList();
    }

    public static EmbeddedKafkaCluster provisionWith(EmbeddedKafkaClusterConfig embeddedKafkaClusterConfig) {
        return new EmbeddedKafkaCluster(embeddedKafkaClusterConfig);
    }

    private Properties effectiveProducerProps(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", getBrokerList());
        return properties2;
    }

    private Properties effectiveConsumerProps(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", getBrokerList());
        return properties2;
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <K, V> List<RecordMetadata> send(SendKeyValues<K, V> sendKeyValues) throws ExecutionException, InterruptedException {
        KafkaProducer kafkaProducer = new KafkaProducer(effectiveProducerProps(sendKeyValues.getProducerProps()));
        ArrayList arrayList = new ArrayList(sendKeyValues.getRecords().size());
        try {
            for (KeyValue<K, V> keyValue : sendKeyValues.getRecords()) {
                arrayList.add(kafkaProducer.send(new ProducerRecord(sendKeyValues.getTopic(), (Integer) null, keyValue.getKey(), keyValue.getValue(), keyValue.getHeaders())).get());
            }
            return Collections.unmodifiableList(arrayList);
        } finally {
            kafkaProducer.flush();
            kafkaProducer.close();
        }
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <K, V> List<RecordMetadata> send(SendKeyValuesTransactional<K, V> sendKeyValuesTransactional) throws ExecutionException, InterruptedException {
        KafkaProducer kafkaProducer = new KafkaProducer(effectiveProducerProps(sendKeyValuesTransactional.getProducerProps()));
        ArrayList arrayList = new ArrayList();
        try {
            try {
                try {
                    kafkaProducer.initTransactions();
                    kafkaProducer.beginTransaction();
                    for (String str : sendKeyValuesTransactional.getRecordsPerTopic().keySet()) {
                        for (KeyValue<K, V> keyValue : sendKeyValuesTransactional.getRecordsPerTopic().get(str)) {
                            arrayList.add(kafkaProducer.send(new ProducerRecord(str, (Integer) null, keyValue.getKey(), keyValue.getValue(), keyValue.getHeaders())).get());
                        }
                    }
                    kafkaProducer.commitTransaction();
                    kafkaProducer.flush();
                    kafkaProducer.close();
                    return Collections.unmodifiableList(arrayList);
                } catch (OutOfOrderSequenceException e) {
                    kafkaProducer.abortTransaction();
                    throw new RuntimeException("This producer has received out-of-band sequence numbers. This is a fatal condition and thus, the producer is no longer able to log transactionally and reliably. Hence, the ongoing transaction is aborted and the producer closed.", e);
                }
            } catch (ProducerFencedException e2) {
                kafkaProducer.abortTransaction();
                throw new RuntimeException(String.format("There happens to be another producer that shares the transactional ID '%s'with this producer, but that has a newer epoch assigned to it. This producer has been fenced off, it can no longer write to the log transactionally. Hence, the ongoing transaction is aborted and the producer closed.", sendKeyValuesTransactional.getProducerProps().get("transactional.id")), e2);
            }
        } catch (Throwable th) {
            kafkaProducer.flush();
            kafkaProducer.close();
            throw th;
        }
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <V> List<RecordMetadata> send(SendValues<V> sendValues) throws ExecutionException, InterruptedException {
        return send(SendKeyValues.to(sendValues.getTopic(), (Collection) sendValues.getValues().stream().map(obj -> {
            return new KeyValue((String) null, obj);
        }).collect(Collectors.toList())).withAll(sendValues.getProducerProps()).build());
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <V> List<RecordMetadata> send(SendValuesTransactional<V> sendValuesTransactional) throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        for (String str : sendValuesTransactional.getValuesPerTopic().keySet()) {
            hashMap.put(str, (Collection) sendValuesTransactional.getValuesPerTopic().get(str).stream().map(obj -> {
                return new KeyValue((String) null, obj);
            }).collect(Collectors.toList()));
        }
        return send(SendKeyValuesTransactional.inTransaction(hashMap).withAll(sendValuesTransactional.getProducerProps()).build());
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> readValues(ReadKeyValues<String, V> readKeyValues) {
        return Collections.unmodifiableList((List) read(readKeyValues).stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()));
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readKeyValues) {
        ArrayList arrayList = new ArrayList();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(effectiveConsumerProps(readKeyValues.getConsumerProps()));
        int limit = readKeyValues.getLimit();
        Predicate<K> filterOnKeys = readKeyValues.getFilterOnKeys();
        Predicate<V> filterOnValues = readKeyValues.getFilterOnValues();
        kafkaConsumer.subscribe(Collections.singletonList(readKeyValues.getTopic()));
        for (int i = 0; i < readKeyValues.getMaxTotalPollTimeMillis() && continueConsuming(arrayList.size(), limit); i += 100) {
            Iterator it = kafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (filterOnKeys.test(consumerRecord.key()) && filterOnValues.test(consumerRecord.value())) {
                    arrayList.add(new KeyValue(consumerRecord.key(), consumerRecord.value(), consumerRecord.headers()));
                }
            }
        }
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
        return Collections.unmodifiableList(arrayList);
    }

    private static boolean continueConsuming(int i, int i2) {
        return i2 <= 0 || i < i2;
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> observeValues(ObserveKeyValues<String, V> observeKeyValues) throws InterruptedException {
        ArrayList arrayList = new ArrayList(observeKeyValues.getExpected());
        long nanoTime = System.nanoTime();
        while (true) {
            arrayList.addAll(readValues(ReadKeyValues.from(observeKeyValues.getTopic(), observeKeyValues.getClazzOfV()).withAll(observeKeyValues.getConsumerProps()).withLimit(observeKeyValues.getExpected()).filterOnKeys(observeKeyValues.getFilterOnKeys()).filterOnValues(observeKeyValues.getFilterOnValues()).build()));
            if (arrayList.size() >= observeKeyValues.getExpected()) {
                return Collections.unmodifiableList(arrayList);
            }
            if (System.nanoTime() > nanoTime + TimeUnit.MILLISECONDS.toNanos(observeKeyValues.getObservationTimeMillis())) {
                throw new AssertionError(String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", Integer.valueOf(observeKeyValues.getExpected()), Integer.valueOf(arrayList.size()), Integer.valueOf(observeKeyValues.getObservationTimeMillis())));
            }
            Thread.sleep(Math.min(observeKeyValues.getObservationTimeMillis(), 100));
        }
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeKeyValues) throws InterruptedException {
        ArrayList arrayList = new ArrayList(observeKeyValues.getExpected());
        long nanoTime = System.nanoTime();
        ReadKeyValues<K, V> build = ReadKeyValues.from(observeKeyValues.getTopic(), observeKeyValues.getClazzOfK(), observeKeyValues.getClazzOfV()).withAll(observeKeyValues.getConsumerProps()).withLimit(observeKeyValues.getExpected()).filterOnKeys(observeKeyValues.getFilterOnKeys()).filterOnValues(observeKeyValues.getFilterOnValues()).build();
        while (true) {
            arrayList.addAll(read(build));
            if (arrayList.size() >= observeKeyValues.getExpected()) {
                return Collections.unmodifiableList(arrayList);
            }
            if (System.nanoTime() > nanoTime + TimeUnit.MILLISECONDS.toNanos(observeKeyValues.getObservationTimeMillis())) {
                throw new AssertionError(String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", Integer.valueOf(observeKeyValues.getExpected()), Integer.valueOf(arrayList.size()), Integer.valueOf(observeKeyValues.getObservationTimeMillis())));
            }
            Thread.sleep(Math.min(observeKeyValues.getObservationTimeMillis(), 100));
        }
    }

    @Override // net.mguenther.kafka.junit.TopicManager
    public void createTopic(TopicConfig topicConfig) {
        ZkUtils zkUtils = null;
        try {
            try {
                try {
                    zkUtils = get();
                    AdminUtils.createTopic(zkUtils, topicConfig.getTopic(), topicConfig.getNumberOfPartitions(), topicConfig.getNumberOfReplicas(), topicConfig.getProperties(), RackAwareMode$Enforced$.MODULE$);
                    log.info("Created topic '{}' with settings {}.", topicConfig.getTopic(), topicConfig);
                    if (zkUtils != null) {
                        zkUtils.close();
                    }
                } catch (IllegalArgumentException | InvalidTopicException e) {
                    throw new RuntimeException("Invalid topic settings.", e);
                }
            } catch (TopicExistsException e2) {
                throw new RuntimeException(String.format("The topic '%s' already exists.", topicConfig.getTopic()), e2);
            } catch (Exception e3) {
                throw new RuntimeException(String.format("Unable to create topic '%s'.", topicConfig.getTopic()), e3);
            }
        } catch (Throwable th) {
            if (zkUtils != null) {
                zkUtils.close();
            }
            throw th;
        }
    }

    @Override // net.mguenther.kafka.junit.TopicManager
    public void deleteTopic(String str) {
        ZkUtils zkUtils = null;
        try {
            try {
                try {
                    zkUtils = get();
                    AdminUtils.deleteTopic(zkUtils, str);
                    log.info("Marked topic '{}' for deletion.", str);
                    if (zkUtils != null) {
                        zkUtils.close();
                    }
                } catch (TopicAlreadyMarkedForDeletionException e) {
                    throw new RuntimeException(String.format("The topic '%s' has already been marked for deletion.", str), e);
                }
            } catch (Exception e2) {
                throw new RuntimeException(String.format("Unable to delete topic '%s'.", str), e2);
            }
        } catch (Throwable th) {
            if (zkUtils != null) {
                zkUtils.close();
            }
            throw th;
        }
    }

    @Override // net.mguenther.kafka.junit.TopicManager
    public boolean exists(String str) {
        ZkUtils zkUtils = null;
        try {
            try {
                zkUtils = get();
                boolean z = AdminUtils.topicExists(zkUtils, str);
                if (zkUtils != null) {
                    zkUtils.close();
                }
                return z;
            } catch (Exception e) {
                throw new RuntimeException(String.format("Unable to query the state of topic '%s'.", str), e);
            }
        } catch (Throwable th) {
            if (zkUtils != null) {
                zkUtils.close();
            }
            throw th;
        }
    }

    private ZkUtils get() {
        return new ZkUtils(new ZkClient(this.zooKeeper.getConnectString(), 10000, 8000, ZKStringSerializer$.MODULE$), new ZkConnection(this.zooKeeper.getConnectString()), false);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stop();
    }

    @ConstructorProperties({"config"})
    public EmbeddedKafkaCluster(EmbeddedKafkaClusterConfig embeddedKafkaClusterConfig) {
        this.config = embeddedKafkaClusterConfig;
    }
}
