package net.mguenther.kafka.junit.provider;

import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.RecordConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:net/mguenther/kafka/junit/provider/DefaultRecordConsumer.class */
public class DefaultRecordConsumer implements RecordConsumer {
    private final String bootstrapServers;

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> readValues(ReadKeyValues<String, V> readKeyValues) {
        return Collections.unmodifiableList((List) read(readKeyValues).stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()));
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readKeyValues) {
        ArrayList arrayList = new ArrayList();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(effectiveConsumerProps(readKeyValues.getConsumerProps()));
        int limit = readKeyValues.getLimit();
        Predicate<K> filterOnKeys = readKeyValues.getFilterOnKeys();
        Predicate<V> filterOnValues = readKeyValues.getFilterOnValues();
        kafkaConsumer.subscribe(Collections.singletonList(readKeyValues.getTopic()));
        for (int i = 0; i < readKeyValues.getMaxTotalPollTimeMillis() && continueConsuming(arrayList.size(), limit); i += 100) {
            Iterator it = kafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (filterOnKeys.test(consumerRecord.key()) && filterOnValues.test(consumerRecord.value())) {
                    arrayList.add(new KeyValue(consumerRecord.key(), consumerRecord.value(), consumerRecord.headers()));
                }
            }
        }
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
        return Collections.unmodifiableList(arrayList);
    }

    private static boolean continueConsuming(int i, int i2) {
        return i2 <= 0 || i < i2;
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> observeValues(ObserveKeyValues<String, V> observeKeyValues) throws InterruptedException {
        ArrayList arrayList = new ArrayList(observeKeyValues.getExpected());
        long nanoTime = System.nanoTime();
        while (true) {
            arrayList.addAll(readValues(ReadKeyValues.from(observeKeyValues.getTopic(), observeKeyValues.getClazzOfV()).withAll(observeKeyValues.getConsumerProps()).withLimit(observeKeyValues.getExpected()).filterOnKeys(observeKeyValues.getFilterOnKeys()).filterOnValues(observeKeyValues.getFilterOnValues()).build()));
            if (arrayList.size() >= observeKeyValues.getExpected()) {
                return Collections.unmodifiableList(arrayList);
            }
            if (System.nanoTime() > nanoTime + TimeUnit.MILLISECONDS.toNanos(observeKeyValues.getObservationTimeMillis())) {
                throw new AssertionError(String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", Integer.valueOf(observeKeyValues.getExpected()), Integer.valueOf(arrayList.size()), Integer.valueOf(observeKeyValues.getObservationTimeMillis())));
            }
            Thread.sleep(Math.min(observeKeyValues.getObservationTimeMillis(), 100));
        }
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeKeyValues) throws InterruptedException {
        ArrayList arrayList = new ArrayList(observeKeyValues.getExpected());
        long nanoTime = System.nanoTime();
        ReadKeyValues<K, V> build = ReadKeyValues.from(observeKeyValues.getTopic(), observeKeyValues.getClazzOfK(), observeKeyValues.getClazzOfV()).withAll(observeKeyValues.getConsumerProps()).withLimit(observeKeyValues.getExpected()).filterOnKeys(observeKeyValues.getFilterOnKeys()).filterOnValues(observeKeyValues.getFilterOnValues()).build();
        while (true) {
            arrayList.addAll(read(build));
            if (arrayList.size() >= observeKeyValues.getExpected()) {
                return Collections.unmodifiableList(arrayList);
            }
            if (System.nanoTime() > nanoTime + TimeUnit.MILLISECONDS.toNanos(observeKeyValues.getObservationTimeMillis())) {
                throw new AssertionError(String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", Integer.valueOf(observeKeyValues.getExpected()), Integer.valueOf(arrayList.size()), Integer.valueOf(observeKeyValues.getObservationTimeMillis())));
            }
            Thread.sleep(Math.min(observeKeyValues.getObservationTimeMillis(), 100));
        }
    }

    private Properties effectiveConsumerProps(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", this.bootstrapServers);
        return properties2;
    }

    @ConstructorProperties({"bootstrapServers"})
    public DefaultRecordConsumer(String str) {
        this.bootstrapServers = str;
    }
}
