package de.id.quarkus.kafka.testing;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:de/id/quarkus/kafka/testing/ConfluentStack.class */
public class ConfluentStack implements QuarkusTestResourceLifecycleManager {
    public static final String CONFLUENT_VERSION_ARG = "confluentVersion";
    public static final String DEFAULT_SOURCE_TOPIC = "reactivemessaging.source-topic";
    public static final String DEFAULT_TARGET_TOPIC = "reactivemessaging.target-topic";
    private DockerImageName kafkaImage;
    private DockerImageName registryImage;
    String kafkaNetworkAlias = "kafka";
    String incoming;
    String outgoing;
    String sourceTopic;
    String targetTopic;
    Network network;
    KafkaContainer kafka;
    ConfluentSchemaRegistryContainer schemaRegistry;
    ConfluentStackClient testClusterClient;

    public void init(Map<String, String> map) {
        String orDefault = map.getOrDefault(CONFLUENT_VERSION_ARG, getConfluentVersionDefault());
        this.kafkaImage = DockerImageName.parse(String.format("confluentinc/cp-kafka:%s", orDefault));
        this.registryImage = DockerImageName.parse(String.format("confluentinc/cp-schema-registry:%s", orDefault));
        this.incoming = map.get("incoming");
        this.outgoing = map.get("outgoing");
        this.sourceTopic = map.getOrDefault("sourceTopic", DEFAULT_SOURCE_TOPIC);
        this.targetTopic = map.getOrDefault("targetTopic", DEFAULT_TARGET_TOPIC);
        this.network = Network.newNetwork();
        this.kafka = new KafkaContainer(this.kafkaImage).withEnv("KAFKA_DELETE_TOPIC_ENABLE", "true").withNetwork(this.network).withNetworkAliases(new String[]{this.kafkaNetworkAlias});
        this.schemaRegistry = (ConfluentSchemaRegistryContainer) new ConfluentSchemaRegistryContainer(this.registryImage, String.format("%s:%d", this.kafkaNetworkAlias, 9092)).withNetwork(this.network);
    }

    String getConfluentVersionDefault() {
        String property = System.getProperty("os.arch");
        return (property == null || !property.contains("aarch64")) ? "7.2.1" : "7.2.1.arm64";
    }

    public Map<String, String> start() {
        if (this.testClusterClient != null || this.kafka == null) {
            return Collections.emptyMap();
        }
        this.kafka.start();
        this.schemaRegistry.start();
        this.testClusterClient = new ConfluentStackClient(this.kafka.getBootstrapServers(), this.schemaRegistry.getUrl(), this.sourceTopic, this.targetTopic);
        this.testClusterClient.createTopics(this.sourceTopic, this.targetTopic);
        HashMap hashMap = new HashMap();
        hashMap.put("kafka.bootstrap.servers", this.kafka.getBootstrapServers());
        if (this.incoming != null) {
            hashMap.put(String.format("mp.messaging.incoming.%s.connector", this.incoming), "smallrye-kafka");
            hashMap.put(String.format("mp.messaging.incoming.%s.allow.auto.create.topics", this.incoming), "false");
            hashMap.put(String.format("mp.messaging.incoming.%s.topic", this.incoming), this.sourceTopic);
            hashMap.put(String.format("mp.messaging.incoming.%s.bootstrap.servers", this.incoming), this.kafka.getBootstrapServers());
            hashMap.put(String.format("mp.messaging.incoming.%s.schema.registry.url", this.incoming), this.schemaRegistry.getUrl());
        }
        if (this.outgoing != null) {
            hashMap.put(String.format("mp.messaging.outgoing.%s.connector", this.outgoing), "smallrye-kafka");
            hashMap.put(String.format("mp.messaging.outgoing.%s.allow.auto.create.topics", this.outgoing), "false");
            hashMap.put(String.format("mp.messaging.outgoing.%s.topic", this.outgoing), this.targetTopic);
            hashMap.put(String.format("mp.messaging.outgoing.%s.bootstrap.servers", this.outgoing), this.kafka.getBootstrapServers());
            hashMap.put(String.format("mp.messaging.outgoing.%s.schema.registry.url", this.outgoing), this.schemaRegistry.getUrl());
        }
        hashMap.put("mp.messaging.connector.smallrye-kafka.schema.registry.url", this.schemaRegistry.getUrl());
        hashMap.put("quarkus.kafka-streams.bootstrap-servers", this.kafka.getBootstrapServers());
        hashMap.put("quarkus.kafka-streams.schema-registry-url", this.schemaRegistry.getUrl());
        return hashMap;
    }

    public void inject(Object obj) {
        injectClientInTestInstance(obj);
    }

    private void injectClientInTestInstance(Object obj) {
        Stream.of((Object[]) new Class[]{obj.getClass(), obj.getClass().getSuperclass()}).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap(cls -> {
            return Arrays.stream(cls.getDeclaredFields());
        }).filter(field -> {
            return field.getType().equals(ConfluentStackClient.class);
        }).forEach(field2 -> {
            field2.setAccessible(true);
            try {
                field2.set(obj, this.testClusterClient);
            } catch (IllegalAccessException e) {
                throw new RuntimeException(String.format("Error while injecting %s to instance %s", ConfluentStackClient.class.getName(), obj), e);
            }
        });
    }

    public void stop() {
        if (this.kafka != null) {
            this.kafka.close();
        }
        if (this.schemaRegistry != null) {
            this.schemaRegistry.close();
        }
    }
}
