package dev.vality.testcontainers.annotations.kafka;

import dev.vality.testcontainers.annotations.exception.KafkaStartingException;
import dev.vality.testcontainers.annotations.util.GenericContainerUtil;
import dev.vality.testcontainers.annotations.util.SpringApplicationPropertiesLoader;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.platform.commons.support.AnnotationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ContextConfigurationAttributes;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.ContextCustomizerFactory;
import org.testcontainers.containers.KafkaContainer;

/* loaded from: input_file:dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerExtension.class */
public class KafkaTestcontainerExtension implements BeforeAllCallback, AfterAllCallback {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestcontainerExtension.class);
    private static final ThreadLocal<KafkaContainer> THREAD_CONTAINER = new ThreadLocal<>();

    /* loaded from: input_file:dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerExtension$KafkaTestcontainerContextCustomizerFactory.class */
    public static class KafkaTestcontainerContextCustomizerFactory implements ContextCustomizerFactory {
        public ContextCustomizer createContextCustomizer(Class<?> cls, List<ContextConfigurationAttributes> list) {
            return (configurableApplicationContext, mergedContextConfiguration) -> {
                if (KafkaTestcontainerExtension.findPrototypeAnnotation((Class<?>) cls).isPresent()) {
                    init(configurableApplicationContext, KafkaTestcontainerExtension.findPrototypeAnnotation((Class<?>) cls).get().properties());
                } else if (KafkaTestcontainerExtension.findSingletonAnnotation((Class<?>) cls).isPresent()) {
                    init(configurableApplicationContext, KafkaTestcontainerExtension.findSingletonAnnotation((Class<?>) cls).get().properties());
                }
            };
        }

        private void init(ConfigurableApplicationContext configurableApplicationContext, String[] strArr) {
            KafkaContainer kafkaContainer = KafkaTestcontainerExtension.THREAD_CONTAINER.get();
            TestPropertyValues.of(new String[]{"kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers(), "spring.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers(), "kafka.ssl.enabled=false"}).and(strArr).applyTo(configurableApplicationContext);
        }
    }

    public void beforeAll(ExtensionContext extensionContext) {
        if (findPrototypeAnnotation(extensionContext).isPresent()) {
            KafkaContainer container = KafkaTestcontainerFactory.container();
            GenericContainerUtil.startContainer(container);
            createTopics(container, loadTopics(findPrototypeAnnotation(extensionContext).get().topicsKeys()));
            THREAD_CONTAINER.set(container);
            return;
        }
        if (findSingletonAnnotation(extensionContext).isPresent()) {
            KafkaContainer singletonContainer = KafkaTestcontainerFactory.singletonContainer();
            List<String> loadTopics = loadTopics(findSingletonAnnotation(extensionContext).get().topicsKeys());
            if (singletonContainer.isRunning()) {
                deleteTopics(singletonContainer, loadTopics);
            } else {
                GenericContainerUtil.startContainer(singletonContainer);
            }
            createTopics(singletonContainer, loadTopics);
            THREAD_CONTAINER.set(singletonContainer);
        }
    }

    public void afterAll(ExtensionContext extensionContext) {
        if (!findPrototypeAnnotation(extensionContext).isPresent()) {
            if (findSingletonAnnotation(extensionContext).isPresent()) {
                THREAD_CONTAINER.remove();
            }
        } else {
            KafkaContainer kafkaContainer = THREAD_CONTAINER.get();
            if (kafkaContainer != null && kafkaContainer.isRunning()) {
                kafkaContainer.stop();
            }
            THREAD_CONTAINER.remove();
        }
    }

    private static Optional<KafkaTestcontainer> findPrototypeAnnotation(ExtensionContext extensionContext) {
        return AnnotationSupport.findAnnotation(extensionContext.getElement(), KafkaTestcontainer.class);
    }

    private static Optional<KafkaTestcontainer> findPrototypeAnnotation(Class<?> cls) {
        return AnnotationSupport.findAnnotation(cls, KafkaTestcontainer.class);
    }

    private static Optional<KafkaTestcontainerSingleton> findSingletonAnnotation(ExtensionContext extensionContext) {
        return AnnotationSupport.findAnnotation(extensionContext.getElement(), KafkaTestcontainerSingleton.class);
    }

    private static Optional<KafkaTestcontainerSingleton> findSingletonAnnotation(Class<?> cls) {
        return AnnotationSupport.findAnnotation(cls, KafkaTestcontainerSingleton.class);
    }

    private List<String> loadTopics(String[] strArr) {
        return (List) SpringApplicationPropertiesLoader.loadFromSpringApplicationPropertiesFile(Arrays.asList(strArr)).values().stream().map(String::valueOf).collect(Collectors.toList());
    }

    private void createTopics(KafkaContainer kafkaContainer, List<String> list) {
        try {
            AdminClient createAdminClient = createAdminClient(kafkaContainer);
            try {
                createAdminClient.createTopics((List) list.stream().map(str -> {
                    return new NewTopic(str, 1, (short) 1);
                }).peek(newTopic -> {
                    log.info(newTopic.toString());
                }).collect(Collectors.toList())).all().get(30L, TimeUnit.SECONDS);
                Set set = (Set) createAdminClient.listTopics().names().get(30L, TimeUnit.SECONDS);
                log.info("Topics list from 'AdminClient' after [TOPICS CREATED]: " + set);
                Assertions.assertThat(set.size()).isEqualTo(list.size());
                Assertions.assertThat(execInContainerKafkaTopicsListCommand(kafkaContainer)).contains(list);
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
            } catch (Throwable th) {
                if (createAdminClient != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KafkaStartingException("Error when topic creating, ", e);
        } catch (ExecutionException | TimeoutException e2) {
            throw new KafkaStartingException("Error when topic creating, ", e2);
        }
    }

    private void deleteTopics(KafkaContainer kafkaContainer, List<String> list) {
        try {
            AdminClient createAdminClient = createAdminClient(kafkaContainer);
            try {
                createAdminClient.deleteTopics(list).all().get(30L, TimeUnit.SECONDS);
                Set set = (Set) createAdminClient.listTopics().names().get(30L, TimeUnit.SECONDS);
                log.info("Topics list from 'AdminClient' after [TOPICS DELETED]: " + set + " (should be empty)");
                Assertions.assertThat(set).isEmpty();
                execInContainerKafkaTopicsListCommand(kafkaContainer);
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
            } catch (Throwable th) {
                if (createAdminClient != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KafkaStartingException("Error when topic deleting, ", e);
        } catch (ExecutionException | TimeoutException e2) {
            throw new KafkaStartingException("Error when topic deleting, ", e2);
        }
    }

    private AdminClient createAdminClient(KafkaContainer kafkaContainer) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        return AdminClient.create(properties);
    }

    private String execInContainerKafkaTopicsListCommand(KafkaContainer kafkaContainer) {
        try {
            String stdout = kafkaContainer.execInContainer(new String[]{"/bin/sh", "-c", "/usr/bin/kafka-topics --bootstrap-server localhost:9092 --list"}).getStdout();
            log.info("Topics list from '/usr/bin/kafka-topics': [" + stdout.replace("\n", ",") + "]");
            return stdout;
        } catch (IOException e) {
            throw new KafkaStartingException("Error when " + "/usr/bin/kafka-topics --bootstrap-server localhost:9092 --list" + ", ", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new KafkaStartingException("Error when " + "/usr/bin/kafka-topics --bootstrap-server localhost:9092 --list" + ", ", e2);
        }
    }
}
