package io.eventuate.messaging.kafka.basic.consumer;

import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.common.EventuateKafkaMultiMessage;
import io.eventuate.messaging.kafka.common.EventuateKafkaMultiMessageConverter;
import io.eventuate.messaging.kafka.consumer.KafkaMessage;
import io.eventuate.messaging.kafka.consumer.KafkaMessageHandler;
import io.eventuate.messaging.kafka.consumer.KafkaSubscription;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.messaging.kafka.producer.EventuateKafkaProducer;
import io.eventuate.util.test.async.Eventually;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Assert;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/eventuate/messaging/kafka/basic/consumer/AbstractEventuateKafkaBasicConsumerTest.class */
public abstract class AbstractEventuateKafkaBasicConsumerTest {
    private KafkaMessageHandler handler;

    protected abstract EventuateKafkaConfigurationProperties getKafkaProperties();

    protected abstract EventuateKafkaConsumerConfigurationProperties getConsumerProperties();

    protected abstract EventuateKafkaProducer getProducer();

    protected abstract MessageConsumerKafkaImpl getConsumer();

    protected abstract KafkaConsumerFactory getKafkaConsumerFactory();

    public void shouldStopWhenHandlerThrowsException() {
        String str = "subscriber-" + System.currentTimeMillis();
        String str2 = "topic-" + System.currentTimeMillis();
        EventuateKafkaConsumerMessageHandler makeExceptionThrowingHandler = makeExceptionThrowingHandler();
        EventuateKafkaConsumer makeConsumer = makeConsumer(str, str2, makeExceptionThrowingHandler);
        sendMessages(str2);
        assertConsumerStopped(makeConsumer);
        assertHandlerInvokedAtLeastOnce(makeExceptionThrowingHandler);
    }

    public void shouldConsumeMessages() {
        String str = "subscriber-" + System.currentTimeMillis();
        String str2 = "topic-" + System.currentTimeMillis();
        sendMessages(str2);
        this.handler = (KafkaMessageHandler) Mockito.mock(KafkaMessageHandler.class);
        KafkaSubscription subscribe = getConsumer().subscribe(str, Collections.singleton(str2), this.handler);
        Eventually.eventually(() -> {
            ((KafkaMessageHandler) Mockito.verify(this.handler, Mockito.atLeastOnce())).accept((KafkaMessage) Matchers.any());
        });
        subscribe.close();
    }

    public void shouldConsumeMessagesWithBackPressure() {
        String str = "subscriber-" + System.currentTimeMillis();
        String str2 = "topic-" + System.currentTimeMillis();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        for (int i = 0; i < 100; i++) {
            sendMessages(str2);
        }
        this.handler = kafkaMessage -> {
            try {
                TimeUnit.MILLISECONDS.sleep(20L);
                linkedBlockingQueue.add(kafkaMessage);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        KafkaSubscription subscribe = getConsumer().subscribe(str, Collections.singleton(str2), this.handler);
        Eventually.eventually(() -> {
            Assert.assertEquals(200L, linkedBlockingQueue.size());
        });
        subscribe.close();
    }

    public void shouldConsumeBatchOfMessage() {
        String str = "subscriber-" + System.currentTimeMillis();
        String str2 = "topic-" + System.currentTimeMillis();
        getProducer().send(str2, (String) null, new EventuateKafkaMultiMessageConverter().convertMessagesToBytes(Arrays.asList(new EventuateKafkaMultiMessage((String) null, "a"), new EventuateKafkaMultiMessage((String) null, "b"), new EventuateKafkaMultiMessage((String) null, "c"))));
        this.handler = (KafkaMessageHandler) Mockito.mock(KafkaMessageHandler.class);
        KafkaSubscription subscribe = getConsumer().subscribe(str, Collections.singleton(str2), this.handler);
        Eventually.eventually(() -> {
            ((KafkaMessageHandler) Mockito.verify(this.handler, Mockito.times(3))).accept((KafkaMessage) Matchers.any());
        });
        subscribe.close();
    }

    private EventuateKafkaConsumer makeConsumer(String str, String str2, EventuateKafkaConsumerMessageHandler eventuateKafkaConsumerMessageHandler) {
        EventuateKafkaConsumer eventuateKafkaConsumer = new EventuateKafkaConsumer(str, eventuateKafkaConsumerMessageHandler, Collections.singletonList(str2), getKafkaProperties().getBootstrapServers(), getConsumerProperties(), getKafkaConsumerFactory());
        eventuateKafkaConsumer.start();
        return eventuateKafkaConsumer;
    }

    private void sendMessages(String str) {
        getProducer().send(str, "1", "a");
        getProducer().send(str, "1", "b");
    }

    private void assertHandlerInvokedAtLeastOnce(EventuateKafkaConsumerMessageHandler eventuateKafkaConsumerMessageHandler) {
        ((EventuateKafkaConsumerMessageHandler) Mockito.verify(eventuateKafkaConsumerMessageHandler, Mockito.atLeast(1))).apply((ConsumerRecord) Matchers.any(), (BiConsumer) Matchers.any());
    }

    private EventuateKafkaConsumerMessageHandler makeExceptionThrowingHandler() {
        EventuateKafkaConsumerMessageHandler eventuateKafkaConsumerMessageHandler = (EventuateKafkaConsumerMessageHandler) Mockito.mock(EventuateKafkaConsumerMessageHandler.class);
        ((EventuateKafkaConsumerMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            CompletableFuture.runAsync(() -> {
                ((BiConsumer) invocationOnMock.getArguments()[1]).accept(null, new RuntimeException("Test is simulating failure"));
            });
            return null;
        }).when(eventuateKafkaConsumerMessageHandler)).apply((ConsumerRecord) Matchers.any(), (BiConsumer) Matchers.any());
        return eventuateKafkaConsumerMessageHandler;
    }

    private void assertConsumerStopped(EventuateKafkaConsumer eventuateKafkaConsumer) {
        Eventually.eventually(() -> {
            Assert.assertEquals(EventuateKafkaConsumerState.MESSAGE_HANDLING_FAILED, eventuateKafkaConsumer.getState());
        });
    }
}
