package org.springframework.cloud.sleuth.instrument.kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.BDDAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.BDDMockito;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.exporter.FinishedSpan;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.cloud.sleuth.test.TestSpanHandler;
import org.springframework.cloud.sleuth.test.TestTracingAwareSupplier;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/springframework/cloud/sleuth/instrument/kafka/KafkaConsumerTest.class */
public abstract class KafkaConsumerTest implements TestTracingAwareSupplier {
    protected String testTopic;
    protected TracingKafkaConsumer<String, String> kafkaConsumer;

    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
    BeanFactory beanFactory;

    @Container
    protected static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")).withExposedPorts(new Integer[]{9093}).waitingFor(Wait.forListeningPort());
    protected Tracer tracer = tracerTest().tracing().tracer();
    protected Propagator propagator = tracerTest().tracing().propagator();
    protected TestSpanHandler spans = tracerTest().handler();
    private final AtomicBoolean consumerRun = new AtomicBoolean();
    protected final AtomicInteger receivedCounter = new AtomicInteger(0);

    @BeforeAll
    static void setupAll() {
        kafkaContainer.start();
    }

    @AfterAll
    static void destroyAll() {
        kafkaContainer.stop();
    }

    @BeforeEach
    void setup() {
        BDDMockito.given(this.beanFactory.getBean(Propagator.class)).willReturn(this.propagator);
        BDDMockito.given(this.beanFactory.getBeanProvider(ResolvableType.forClassWithGenerics(Propagator.Getter.class, new ResolvableType[]{ResolvableType.forType(new ParameterizedTypeReference<ConsumerRecord<?, ?>>() { // from class: org.springframework.cloud.sleuth.instrument.kafka.KafkaConsumerTest.1
        })})).getIfAvailable()).willReturn(new TracingKafkaPropagatorGetter());
        this.testTopic = UUID.randomUUID().toString();
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        hashMap.put("group.id", "test-consumer-group");
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", StringDeserializer.class);
        hashMap.put("auto.offset.reset", "earliest");
        this.kafkaConsumer = new TracingKafkaConsumer<>(new KafkaConsumer(hashMap), this.beanFactory);
        this.consumerRun.set(true);
        Executors.newSingleThreadExecutor().execute(() -> {
            doStartKafkaConsumer(this.receivedCounter);
        });
    }

    @AfterEach
    void destroy() {
        this.consumerRun.set(false);
    }

    @Test
    public void should_create_and_finish_consumer_span() {
        KafkaProducer<String, String> buildTestKafkaProducer = KafkaTestUtils.buildTestKafkaProducer(kafkaContainer.getBootstrapServers());
        buildTestKafkaProducer.send(new ProducerRecord(this.testTopic, "test", "test"));
        buildTestKafkaProducer.close();
        Awaitility.await().atMost(Duration.ofSeconds(15L)).until(() -> {
            return Boolean.valueOf(this.receivedCounter.intValue() == 1);
        });
        BDDAssertions.then(this.tracer.currentSpan()).isNull();
        BDDAssertions.then(this.spans).hasSize(1);
        FinishedSpan finishedSpan = this.spans.get(0);
        BDDAssertions.then(finishedSpan.getKind()).isEqualTo(Span.Kind.CONSUMER);
        BDDAssertions.then(finishedSpan.getTags()).isNotEmpty();
        BDDAssertions.then((String) finishedSpan.getTags().get("kafka.topic")).isEqualTo(this.testTopic);
        BDDAssertions.then((String) finishedSpan.getTags().get("kafka.offset")).isEqualTo("0");
        BDDAssertions.then((String) finishedSpan.getTags().get("kafka.partition")).isEqualTo("0");
    }

    private void doStartKafkaConsumer(AtomicInteger atomicInteger) {
        this.kafkaConsumer.subscribe(Pattern.compile(this.testTopic));
        while (this.consumerRun.get()) {
            Iterator it = this.kafkaConsumer.poll(Duration.ofSeconds(1L)).iterator();
            while (it.hasNext()) {
                atomicInteger.incrementAndGet();
            }
        }
        this.kafkaConsumer.close();
    }

    @Override // org.springframework.cloud.sleuth.test.TestTracingAwareSupplier
    public void cleanUpTracing() {
        this.spans.clear();
    }
}
