package de.id.quarkus.kafka.testing;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

/* loaded from: input_file:de/id/quarkus/kafka/testing/ConfluentStackTestClusterClient.class */
public class ConfluentStackTestClusterClient {
    private final String kafkaBootstrapServers;
    private final String schemaRegistryUrl;

    public ConfluentStackTestClusterClient(String str, String str2) {
        this.kafkaBootstrapServers = str;
        this.schemaRegistryUrl = str2;
    }

    public AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaBootstrapServers);
        return KafkaAdminClient.create(properties);
    }

    public void deleteAllTopics() {
        AdminClient createAdminClient = createAdminClient();
        try {
            KafkaFuture names = createAdminClient.listTopics().names();
            Objects.requireNonNull(createAdminClient);
            names.thenApply((v1) -> {
                return r1.deleteTopics(v1);
            }).get();
        } catch (Exception e) {
            throw new RuntimeException("Error while deleting topics", e);
        }
    }

    public void createTopics(String... strArr) {
        try {
            createAdminClient().createTopics((List) Arrays.stream(strArr).map(str -> {
                return new NewTopic(str, 1, (short) 1);
            }).collect(Collectors.toList())).all().get();
        } catch (Exception e) {
            throw new RuntimeException("Error while creating topics", e);
        }
    }

    public void registerSchemaRegistryTypes(Schema schema) {
        try {
            new CachedSchemaRegistryClient(getSchemaRegistryUrl(), 1000).register(schema.getFullName(), schema);
        } catch (Exception e) {
            throw new RuntimeException("Error while registering schemas", e);
        }
    }

    public <K, V> KafkaProducer<K, V> createProducerWithAvroValue(Class<? extends Serializer<K>> cls) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaBootstrapServers);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("key.serializer", cls.getName());
        properties.put("value.serializer", SpecificAvroSerializer.class.getName());
        properties.put("schema.registry.url", this.schemaRegistryUrl);
        return new KafkaProducer<>(properties);
    }

    public <K, V> KafkaConsumer<K, V> createConsumerWithAvroValue(Class<? extends Deserializer<K>> cls, String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaBootstrapServers);
        properties.put("group.id", str + UUID.randomUUID());
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("allow.auto.create.topics", "false");
        properties.put("schema.registry.url", this.schemaRegistryUrl);
        properties.put("key.deserializer", cls);
        properties.put("value.deserializer", SpecificAvroDeserializer.class);
        return new KafkaConsumer<>(properties);
    }

    public <K, V> void produce(String str, List<V> list, Class<? extends Serializer<K>> cls, BiFunction<Integer, V, K> biFunction) {
        KafkaProducer<K, V> createProducerWithAvroValue = createProducerWithAvroValue(cls);
        int size = list.size();
        for (int i = 0; i < size; i++) {
            V v = list.get(i);
            createProducerWithAvroValue.send(new ProducerRecord(str, biFunction.apply(Integer.valueOf(i), v), v));
        }
        createProducerWithAvroValue.flush();
    }

    public <K, V> List<V> consume(String str, String str2, int i, int i2, Class<? extends Deserializer<K>> cls) {
        Instant now = Instant.now();
        KafkaConsumer<K, V> createConsumerWithAvroValue = createConsumerWithAvroValue(cls, str2);
        createConsumerWithAvroValue.subscribe(Collections.singletonList(str));
        ArrayList arrayList = new ArrayList();
        do {
            createConsumerWithAvroValue.poll(Duration.ofMillis(1000L)).records(str).forEach(consumerRecord -> {
                arrayList.add(consumerRecord.value());
            });
            if (Instant.now().toEpochMilli() - now.toEpochMilli() >= i) {
                break;
            }
        } while (arrayList.size() < i2);
        return arrayList;
    }

    public String getKafkaBootstrapServers() {
        return this.kafkaBootstrapServers;
    }

    public String getSchemaRegistryUrl() {
        return this.schemaRegistryUrl;
    }
}
