package org.springframework.cloud.stream.binder.kafka;

import java.util.Arrays;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/RawModeKafka09BinderTests.class */
public class RawModeKafka09BinderTests extends Kafka09BinderTests {
    @Override // org.springframework.cloud.stream.binder.kafka.KafkaBinderTests
    @Test
    public void testPartitionedModuleJava() throws Exception {
        Kafka09TestBinder binder = m3getBinder();
        ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties = mo1createProducerProperties();
        createProducerProperties.setHeaderMode(HeaderMode.raw);
        createProducerProperties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
        createProducerProperties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
        createProducerProperties.setPartitionCount(6);
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(createProducerProperties));
        createBindableChannel.setBeanName("test.output");
        Binding bindProducer = binder.bindProducer("partJ.0", createBindableChannel, createProducerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties = mo2createConsumerProperties();
        createConsumerProperties.setConcurrency(2);
        createConsumerProperties.setInstanceCount(3);
        createConsumerProperties.setInstanceIndex(0);
        createConsumerProperties.setPartitioned(true);
        createConsumerProperties.setHeaderMode(HeaderMode.raw);
        ((KafkaConsumerProperties) createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0J");
        Binding bindConsumer = binder.bindConsumer("partJ.0", "test", queueChannel, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1J");
        Binding bindConsumer2 = binder.bindConsumer("partJ.0", "test", queueChannel2, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2J");
        Binding bindConsumer3 = binder.bindConsumer("partJ.0", "test", queueChannel3, createConsumerProperties);
        createBindableChannel.send(new GenericMessage(new byte[]{0}));
        createBindableChannel.send(new GenericMessage(new byte[]{1}));
        createBindableChannel.send(new GenericMessage(new byte[]{2}));
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Message receive3 = receive(queueChannel3);
        Assertions.assertThat(receive3).isNotNull();
        Assertions.assertThat(Arrays.asList(Byte.valueOf(((byte[]) receive.getPayload())[0]), Byte.valueOf(((byte[]) receive2.getPayload())[0]), Byte.valueOf(((byte[]) receive3.getPayload())[0]))).containsExactlyInAnyOrder(new Byte[]{(byte) 0, (byte) 1, (byte) 2});
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Override // org.springframework.cloud.stream.binder.kafka.KafkaBinderTests
    @Test
    public void testPartitionedModuleSpEL() throws Exception {
        Kafka09TestBinder binder = m3getBinder();
        ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties = mo1createProducerProperties();
        createProducerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
        createProducerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
        createProducerProperties.setPartitionCount(6);
        createProducerProperties.setHeaderMode(HeaderMode.raw);
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(createProducerProperties));
        createBindableChannel.setBeanName("test.output");
        Binding bindProducer = binder.bindProducer("part.0", createBindableChannel, createProducerProperties);
        try {
            Assertions.assertThat(getEndpointRouting(extractEndpoint(bindProducer))).contains(new CharSequence[]{getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']"});
        } catch (UnsupportedOperationException e) {
        }
        ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties = mo2createConsumerProperties();
        createConsumerProperties.setConcurrency(2);
        createConsumerProperties.setInstanceIndex(0);
        createConsumerProperties.setInstanceCount(3);
        createConsumerProperties.setPartitioned(true);
        createConsumerProperties.setHeaderMode(HeaderMode.raw);
        ((KafkaConsumerProperties) createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0S");
        Binding bindConsumer = binder.bindConsumer("part.0", "test", queueChannel, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1S");
        Binding bindConsumer2 = binder.bindConsumer("part.0", "test", queueChannel2, createConsumerProperties);
        createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2S");
        Binding bindConsumer3 = binder.bindConsumer("part.0", "test", queueChannel3, createConsumerProperties);
        createBindableChannel.send(MessageBuilder.withPayload(new byte[]{2}).setHeader("correlationId", "foo").setHeader("sequenceNumber", 42).setHeader("sequenceSize", 43).build());
        createBindableChannel.send(new GenericMessage(new byte[]{1}));
        createBindableChannel.send(new GenericMessage(new byte[]{0}));
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Message receive3 = receive(queueChannel3);
        Assertions.assertThat(receive3).isNotNull();
        Assertions.assertThat(Arrays.asList(Byte.valueOf(((byte[]) receive.getPayload())[0]), Byte.valueOf(((byte[]) receive2.getPayload())[0]), Byte.valueOf(((byte[]) receive3.getPayload())[0]))).containsExactlyInAnyOrder(new Byte[]{(byte) 0, (byte) 1, (byte) 2});
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testSendAndReceive() throws Exception {
        Kafka09TestBinder binder = m3getBinder();
        DirectChannel directChannel = new DirectChannel();
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties = mo1createProducerProperties();
        createProducerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindProducer = binder.bindProducer("foo.0", directChannel, createProducerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties = mo2createConsumerProperties();
        createConsumerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindConsumer = binder.bindConsumer("foo.0", "test", queueChannel, createConsumerProperties);
        Message build = MessageBuilder.withPayload("foo".getBytes()).build();
        binderBindUnbindLatency();
        directChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo("foo");
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendAndReceiveWithExplicitConsumerGroup() {
        Kafka09TestBinder binder = m3getBinder();
        DirectChannel directChannel = new DirectChannel();
        QueueChannel queueChannel = new QueueChannel();
        QueueChannel queueChannel2 = new QueueChannel();
        QueueChannel queueChannel3 = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties = mo1createProducerProperties();
        createProducerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindProducer = binder.bindProducer("baz.0", directChannel, createProducerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties = mo2createConsumerProperties();
        createConsumerProperties.setHeaderMode(HeaderMode.raw);
        ((KafkaConsumerProperties) createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        Binding bindConsumer = binder.bindConsumer("baz.0", "test", queueChannel, createConsumerProperties);
        Binding bindConsumer2 = binder.bindConsumer("baz.0", "tap1", queueChannel2, createConsumerProperties);
        Binding bindConsumer3 = binder.bindConsumer("baz.0", "tap2", queueChannel3, createConsumerProperties);
        Message build = MessageBuilder.withPayload("foo".getBytes()).build();
        boolean z = false;
        boolean z2 = false;
        while (!z) {
            directChannel.send(build);
            Message receive = receive(queueChannel);
            Assertions.assertThat(receive).isNotNull();
            Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo("foo");
            Message receive2 = receive(queueChannel2);
            Message receive3 = receive(queueChannel3);
            if (receive2 == null || receive3 == null) {
                Assertions.assertThat(z2).isFalse().withFailMessage("Failed to receive tap after retry", new Object[0]);
                z2 = true;
            } else {
                z = true;
                Assertions.assertThat(new String((byte[]) receive2.getPayload())).isEqualTo("foo");
                Assertions.assertThat(new String((byte[]) receive3.getPayload())).isEqualTo("foo");
            }
        }
        bindConsumer3.unbind();
        directChannel.send(MessageBuilder.withPayload("bar".getBytes()).build());
        Assertions.assertThat(receive(queueChannel2)).isNotNull();
        Assertions.assertThat(receive(queueChannel3)).isNull();
        Binding bindConsumer4 = binder.bindConsumer("baz.0", "tap2", queueChannel3, mo2createConsumerProperties());
        Assertions.assertThat(receive(queueChannel3)).isNotNull();
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer4.unbind();
        bindProducer.unbind();
        Assertions.assertThat(extractEndpoint(bindConsumer).isRunning()).isFalse();
        Assertions.assertThat(extractEndpoint(bindConsumer2).isRunning()).isFalse();
        Assertions.assertThat(extractEndpoint(bindConsumer4).isRunning()).isFalse();
        Assertions.assertThat(extractEndpoint(bindProducer).isRunning()).isFalse();
    }
}
