package net.mguenther.kafka.junit.provider;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.KeyValueMetadata;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.RecordConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/mguenther/kafka/junit/provider/DefaultRecordConsumer.class */
public class DefaultRecordConsumer implements RecordConsumer {
    private static final Logger log = LoggerFactory.getLogger(DefaultRecordConsumer.class);
    private final String bootstrapServers;

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> readValues(ReadKeyValues<String, V> readKeyValues) throws InterruptedException {
        return Collections.unmodifiableList((List) read(readKeyValues).stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()));
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readKeyValues) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(effectiveConsumerProps(readKeyValues.getConsumerProps()));
        int limit = readKeyValues.getLimit();
        Predicate<K> filterOnKeys = readKeyValues.getFilterOnKeys();
        Predicate<V> filterOnValues = readKeyValues.getFilterOnValues();
        Predicate<Headers> filterOnHeaders = readKeyValues.getFilterOnHeaders();
        kafkaConsumer.subscribe(Collections.singletonList(readKeyValues.getTopic()));
        int i = 0;
        boolean z = false;
        while (i < readKeyValues.getMaxTotalPollTimeMillis() && continueConsuming(arrayList.size(), limit)) {
            ConsumerRecords poll = kafkaConsumer.poll(100L);
            if (!z) {
                z = true;
                if (seekIfNecessary(readKeyValues, kafkaConsumer)) {
                }
            }
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (filterOnKeys.test(consumerRecord.key()) && filterOnValues.test(consumerRecord.value()) && filterOnHeaders.test(consumerRecord.headers())) {
                    arrayList.add(readKeyValues.isIncludeMetadata() ? new KeyValue(consumerRecord.key(), consumerRecord.value(), consumerRecord.headers(), new KeyValueMetadata(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset())) : new KeyValue(consumerRecord.key(), consumerRecord.value(), consumerRecord.headers()));
                }
            }
            i += 100;
        }
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
        return Collections.unmodifiableList(arrayList);
    }

    private <K, V> boolean seekIfNecessary(ReadKeyValues<K, V> readKeyValues, KafkaConsumer<K, V> kafkaConsumer) {
        boolean z = readKeyValues.getSeekTo().size() > 0;
        if (z) {
            readKeyValues.getSeekTo().keySet().stream().map(num -> {
                return new TopicPartition(readKeyValues.getTopic(), num.intValue());
            }).peek(topicPartition -> {
                log.info("Seeking to offset {} of topic-partition {}.", readKeyValues.getSeekTo().get(Integer.valueOf(topicPartition.partition())), topicPartition);
            }).forEach(topicPartition2 -> {
                kafkaConsumer.seek(topicPartition2, readKeyValues.getSeekTo().get(Integer.valueOf(topicPartition2.partition())).longValue());
            });
        }
        return z;
    }

    private static boolean continueConsuming(int i, int i2) {
        return i2 <= 0 || i < i2;
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <V> List<V> observeValues(ObserveKeyValues<String, V> observeKeyValues) throws InterruptedException {
        ArrayList arrayList = new ArrayList(observeKeyValues.getExpected());
        long nanoTime = System.nanoTime();
        ReadKeyValues<String, V> withPartitionSeekForReadValues = withPartitionSeekForReadValues(observeKeyValues);
        ReadKeyValues<String, V> withoutPartitionSeekForReadValues = withoutPartitionSeekForReadValues(observeKeyValues);
        boolean z = true;
        while (true) {
            arrayList.addAll(z ? readValues(withPartitionSeekForReadValues) : readValues(withoutPartitionSeekForReadValues));
            if (z) {
                z = false;
            }
            if (arrayList.size() >= observeKeyValues.getExpected()) {
                return Collections.unmodifiableList(arrayList);
            }
            if (System.nanoTime() > nanoTime + TimeUnit.MILLISECONDS.toNanos(observeKeyValues.getObservationTimeMillis())) {
                throw new AssertionError(String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", Integer.valueOf(observeKeyValues.getExpected()), Integer.valueOf(arrayList.size()), Integer.valueOf(observeKeyValues.getObservationTimeMillis())));
            }
            Thread.sleep(Math.min(observeKeyValues.getObservationTimeMillis(), 100));
        }
    }

    private <V> ReadKeyValues<String, V> withPartitionSeekForReadValues(ObserveKeyValues<String, V> observeKeyValues) {
        return toReadValuesRequest(observeKeyValues).seekTo(observeKeyValues.getSeekTo()).build();
    }

    private <V> ReadKeyValues<String, V> withoutPartitionSeekForReadValues(ObserveKeyValues<String, V> observeKeyValues) {
        return toReadValuesRequest(observeKeyValues).build();
    }

    private <V> ReadKeyValues.ReadKeyValuesBuilder<String, V> toReadValuesRequest(ObserveKeyValues<String, V> observeKeyValues) {
        return ReadKeyValues.from(observeKeyValues.getTopic(), observeKeyValues.getClazzOfV()).withAll(observeKeyValues.getConsumerProps()).withLimit(observeKeyValues.getExpected()).withMetadata(false).filterOnKeys(observeKeyValues.getFilterOnKeys()).filterOnValues(observeKeyValues.getFilterOnValues()).filterOnHeaders(observeKeyValues.getFilterOnHeaders()).with("group.id", observeKeyValues.getConsumerProps().getProperty("group.id"));
    }

    @Override // net.mguenther.kafka.junit.RecordConsumer
    public <K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeKeyValues) throws InterruptedException {
        ArrayList arrayList = new ArrayList(observeKeyValues.getExpected());
        long nanoTime = System.nanoTime();
        ReadKeyValues<K, V> withPartitionSeek = withPartitionSeek(observeKeyValues);
        ReadKeyValues<K, V> withoutPartitionSeek = withoutPartitionSeek(observeKeyValues);
        boolean z = true;
        while (true) {
            arrayList.addAll(z ? read(withPartitionSeek) : read(withoutPartitionSeek));
            if (z) {
                z = false;
            }
            if (arrayList.size() >= observeKeyValues.getExpected()) {
                return Collections.unmodifiableList(arrayList);
            }
            if (System.nanoTime() > nanoTime + TimeUnit.MILLISECONDS.toNanos(observeKeyValues.getObservationTimeMillis())) {
                throw new AssertionError(String.format("Expected %s records, but consumed only %s records before ran into timeout (%s ms).", Integer.valueOf(observeKeyValues.getExpected()), Integer.valueOf(arrayList.size()), Integer.valueOf(observeKeyValues.getObservationTimeMillis())));
            }
            Thread.sleep(Math.min(observeKeyValues.getObservationTimeMillis(), 100));
        }
    }

    private <K, V> ReadKeyValues<K, V> withPartitionSeek(ObserveKeyValues<K, V> observeKeyValues) {
        return toReadKeyValuesRequest(observeKeyValues).seekTo(observeKeyValues.getSeekTo()).build();
    }

    private <K, V> ReadKeyValues<K, V> withoutPartitionSeek(ObserveKeyValues<K, V> observeKeyValues) {
        return toReadKeyValuesRequest(observeKeyValues).build();
    }

    private <K, V> ReadKeyValues.ReadKeyValuesBuilder<K, V> toReadKeyValuesRequest(ObserveKeyValues<K, V> observeKeyValues) {
        return ReadKeyValues.from(observeKeyValues.getTopic(), observeKeyValues.getClazzOfK(), observeKeyValues.getClazzOfV()).withAll(observeKeyValues.getConsumerProps()).withLimit(observeKeyValues.getExpected()).withMetadata(observeKeyValues.isIncludeMetadata()).filterOnKeys(observeKeyValues.getFilterOnKeys()).filterOnValues(observeKeyValues.getFilterOnValues()).filterOnHeaders(observeKeyValues.getFilterOnHeaders()).with("group.id", observeKeyValues.getConsumerProps().getProperty("group.id"));
    }

    private Properties effectiveConsumerProps(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", this.bootstrapServers);
        return properties2;
    }

    public DefaultRecordConsumer(String str) {
        this.bootstrapServers = str;
    }
}
