package org.springframework.cloud.stream.binder.kafka;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
import org.springframework.cloud.stream.binder.PartitionTestSupport;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.class */
public abstract class KafkaBinderTests extends PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {

    @Rule
    public ExpectedException expectedProvisioningException = ExpectedException.none();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaBinderTests$FailingInvocationCountingMessageHandler.class */
    public final class FailingInvocationCountingMessageHandler implements MessageHandler {
        private int invocationCount;
        private final LinkedHashMap<Long, Message<?>> receivedMessages;
        private final CountDownLatch latch;

        private FailingInvocationCountingMessageHandler(int i) {
            this.receivedMessages = new LinkedHashMap<>();
            this.latch = new CountDownLatch(i);
        }

        private FailingInvocationCountingMessageHandler(KafkaBinderTests kafkaBinderTests) {
            this(1);
        }

        public void handleMessage(Message<?> message) throws MessagingException {
            this.invocationCount++;
            Long l = (Long) message.getHeaders().get(KafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class);
            if (!this.receivedMessages.containsKey(l)) {
                this.receivedMessages.put(l, message);
                this.latch.countDown();
            }
            throw new RuntimeException("fail");
        }

        public LinkedHashMap<Long, Message<?>> getReceivedMessages() {
            return this.receivedMessages;
        }

        public int getInvocationCount() {
            return this.invocationCount;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: createConsumerProperties, reason: merged with bridge method [inline-methods] */
    public ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties() {
        ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
        extendedConsumerProperties.setInstanceCount(1);
        extendedConsumerProperties.setInstanceIndex(0);
        return extendedConsumerProperties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: createProducerProperties, reason: merged with bridge method [inline-methods] */
    public ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties() {
        ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KafkaProducerProperties());
        ((KafkaProducerProperties) extendedProducerProperties.getExtension()).setSync(true);
        return extendedProducerProperties;
    }

    public abstract String getKafkaOffsetHeaderKey();

    protected abstract Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);

    protected abstract KafkaBinderConfigurationProperties createConfigurationProperties();

    protected abstract int partitionSize(String str);

    protected abstract ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);

    protected abstract void invokeCreateTopic(ZkUtils zkUtils, String str, int i, int i2, Properties properties);

    protected abstract int invokePartitionSize(String str, ZkUtils zkUtils);

    @Test
    public void testDlqAndRetry() throws Exception {
        testDlqGuts(true);
    }

    @Test
    public void testDlq() throws Exception {
        testDlqGuts(false);
    }

    private void testDlqGuts(boolean z) throws Exception {
        AbstractKafkaTestBinder abstractKafkaTestBinder = (AbstractKafkaTestBinder) getBinder();
        DirectChannel directChannel = new DirectChannel();
        DirectChannel directChannel2 = new DirectChannel();
        QueueChannel queueChannel = new QueueChannel();
        FailingInvocationCountingMessageHandler failingInvocationCountingMessageHandler = new FailingInvocationCountingMessageHandler();
        directChannel2.subscribe(failingInvocationCountingMessageHandler);
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionCount(2);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setMaxAttempts(z ? 2 : 1);
        m1createConsumerProperties.setBackOffInitialInterval(100);
        m1createConsumerProperties.setBackOffMaxInterval(150);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setEnableDlq(true);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        long currentTimeMillis = System.currentTimeMillis();
        String str = "dlqTest." + currentTimeMillis + ".0";
        Binding bindProducer = abstractKafkaTestBinder.bindProducer(str, directChannel, m0createProducerProperties);
        Binding bindConsumer = abstractKafkaTestBinder.bindConsumer(str, "testGroup", directChannel2, m1createConsumerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties2 = m1createConsumerProperties();
        m1createConsumerProperties2.setMaxAttempts(1);
        ApplicationContext applicationContext = (ApplicationContext) TestUtils.getPropertyValue(abstractKafkaTestBinder.getBinder(), "applicationContext", ApplicationContext.class);
        SubscribableChannel subscribableChannel = (SubscribableChannel) applicationContext.getBean(str + ".testGroup.errors-0", SubscribableChannel.class);
        SubscribableChannel subscribableChannel2 = (SubscribableChannel) applicationContext.getBean("errorChannel", SubscribableChannel.class);
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(!z);
        subscribableChannel.subscribe(new MessageHandler() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaBinderTests.1
            public void handleMessage(Message<?> message) throws MessagingException {
                atomicReference.set(message);
                atomicBoolean.set(Arrays.toString(new RuntimeException().getStackTrace()).contains("ErrorMessageSendingRecoverer"));
            }
        });
        subscribableChannel2.subscribe(new MessageHandler() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaBinderTests.2
            public void handleMessage(Message<?> message) throws MessagingException {
                atomicReference2.set(message);
            }
        });
        Binding bindConsumer2 = abstractKafkaTestBinder.bindConsumer("error.dlqTest." + currentTimeMillis + ".0.testGroup", null, queueChannel, m1createConsumerProperties2);
        binderBindUnbindLatency();
        String str2 = "test." + UUID.randomUUID().toString();
        directChannel.send(MessageBuilder.withPayload(str2).build());
        Message receive = receive(queueChannel, 3);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getPayload()).isEqualTo(str2);
        Assertions.assertThat(failingInvocationCountingMessageHandler.getInvocationCount()).isEqualTo(m1createConsumerProperties.getMaxAttempts());
        binderBindUnbindLatency();
        Assertions.assertThat(atomicReference.get()).isNotNull();
        Assertions.assertThat(atomicReference2.get()).isNotNull();
        Assertions.assertThat(atomicBoolean.get()).isEqualTo(z);
        bindConsumer2.unbind();
        bindConsumer.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testDefaultAutoCommitOnErrorWithoutDlq() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        DirectChannel directChannel2 = new DirectChannel();
        FailingInvocationCountingMessageHandler failingInvocationCountingMessageHandler = new FailingInvocationCountingMessageHandler();
        directChannel2.subscribe(failingInvocationCountingMessageHandler);
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionCount(10);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setMaxAttempts(1);
        m1createConsumerProperties.setBackOffInitialInterval(100);
        m1createConsumerProperties.setBackOffMaxInterval(150);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        long currentTimeMillis = System.currentTimeMillis();
        Binding bindProducer = binder.bindProducer("retryTest." + currentTimeMillis + ".0", directChannel, m0createProducerProperties);
        Binding bindConsumer = binder.bindConsumer("retryTest." + currentTimeMillis + ".0", "testGroup", directChannel2, m1createConsumerProperties);
        String str = "test." + UUID.randomUUID().toString();
        directChannel.send(MessageBuilder.withPayload(str).build());
        Assertions.assertThat(failingInvocationCountingMessageHandler.getLatch().await((int) (this.timeoutMultiplier * 1000.0d), TimeUnit.MILLISECONDS));
        Assertions.assertThat(failingInvocationCountingMessageHandler.getReceivedMessages().entrySet()).hasSize(1);
        Message<?> value = failingInvocationCountingMessageHandler.getReceivedMessages().entrySet().iterator().next().getValue();
        Assertions.assertThat(value).isNotNull();
        Assertions.assertThat(value.getPayload()).isEqualTo(str);
        Assertions.assertThat(failingInvocationCountingMessageHandler.getInvocationCount()).isEqualTo(m1createConsumerProperties.getMaxAttempts());
        bindConsumer.unbind();
        QueueChannel queueChannel = new QueueChannel();
        Binding bindConsumer2 = binder.bindConsumer("retryTest." + currentTimeMillis + ".0", "testGroup", queueChannel, m1createConsumerProperties);
        binderBindUnbindLatency();
        String str2 = "test." + UUID.randomUUID().toString();
        directChannel.send(MessageBuilder.withPayload(str2).build());
        Assertions.assertThat(receive(queueChannel).getPayload()).isEqualTo(str);
        Assertions.assertThat(receive(queueChannel).getPayload()).isEqualTo(str2);
        bindConsumer2.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        DirectChannel directChannel2 = new DirectChannel();
        FailingInvocationCountingMessageHandler failingInvocationCountingMessageHandler = new FailingInvocationCountingMessageHandler();
        directChannel2.subscribe(failingInvocationCountingMessageHandler);
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionCount(10);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setMaxAttempts(3);
        m1createConsumerProperties.setBackOffInitialInterval(100);
        m1createConsumerProperties.setBackOffMaxInterval(150);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setEnableDlq(true);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        long currentTimeMillis = System.currentTimeMillis();
        Binding bindProducer = binder.bindProducer("retryTest." + currentTimeMillis + ".0", directChannel, m0createProducerProperties);
        Binding bindConsumer = binder.bindConsumer("retryTest." + currentTimeMillis + ".0", "testGroup", directChannel2, m1createConsumerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties2 = m1createConsumerProperties();
        m1createConsumerProperties2.setMaxAttempts(1);
        QueueChannel queueChannel = new QueueChannel();
        Binding bindConsumer2 = binder.bindConsumer("error.retryTest." + currentTimeMillis + ".0.testGroup", (String) null, queueChannel, m1createConsumerProperties2);
        String str = "test." + UUID.randomUUID().toString();
        directChannel.send(MessageBuilder.withPayload(str).build());
        Message receive = receive(queueChannel, 3);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getPayload()).isEqualTo(str);
        Assertions.assertThat(failingInvocationCountingMessageHandler.getReceivedMessages().entrySet()).hasSize(1);
        Message<?> value = failingInvocationCountingMessageHandler.getReceivedMessages().entrySet().iterator().next().getValue();
        Assertions.assertThat(value).isNotNull();
        Assertions.assertThat(value.getPayload()).isEqualTo(str);
        Assertions.assertThat(failingInvocationCountingMessageHandler.getInvocationCount()).isEqualTo(m1createConsumerProperties.getMaxAttempts());
        binderBindUnbindLatency();
        bindConsumer2.unbind();
        bindConsumer.unbind();
        QueueChannel queueChannel2 = new QueueChannel();
        Binding bindConsumer3 = binder.bindConsumer("retryTest." + currentTimeMillis + ".0", "testGroup", queueChannel2, m1createConsumerProperties);
        String str2 = "test." + UUID.randomUUID().toString();
        directChannel.send(MessageBuilder.withPayload(str2).build());
        Assertions.assertThat(receive(queueChannel2).getPayload()).isEqualTo(str2);
        binderBindUnbindLatency();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testConfigurableDlqName() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        DirectChannel directChannel2 = new DirectChannel();
        FailingInvocationCountingMessageHandler failingInvocationCountingMessageHandler = new FailingInvocationCountingMessageHandler();
        directChannel2.subscribe(failingInvocationCountingMessageHandler);
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionCount(10);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setMaxAttempts(3);
        m1createConsumerProperties.setBackOffInitialInterval(100);
        m1createConsumerProperties.setBackOffMaxInterval(150);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setEnableDlq(true);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setDlqName("dlqTest");
        long currentTimeMillis = System.currentTimeMillis();
        Binding bindProducer = binder.bindProducer("retryTest." + currentTimeMillis + ".0", directChannel, m0createProducerProperties);
        Binding bindConsumer = binder.bindConsumer("retryTest." + currentTimeMillis + ".0", "testGroup", directChannel2, m1createConsumerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties2 = m1createConsumerProperties();
        m1createConsumerProperties2.setMaxAttempts(1);
        QueueChannel queueChannel = new QueueChannel();
        Binding bindConsumer2 = binder.bindConsumer("dlqTest", (String) null, queueChannel, m1createConsumerProperties2);
        String str = "test." + UUID.randomUUID().toString();
        directChannel.send(MessageBuilder.withPayload(str).build());
        Message receive = receive(queueChannel, 3);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getPayload()).isEqualTo(str);
        Assertions.assertThat(failingInvocationCountingMessageHandler.getReceivedMessages().entrySet()).hasSize(1);
        Message<?> value = failingInvocationCountingMessageHandler.getReceivedMessages().entrySet().iterator().next().getValue();
        Assertions.assertThat(value).isNotNull();
        Assertions.assertThat(value.getPayload()).isEqualTo(str);
        Assertions.assertThat(failingInvocationCountingMessageHandler.getInvocationCount()).isEqualTo(m1createConsumerProperties.getMaxAttempts());
        binderBindUnbindLatency();
        bindConsumer2.unbind();
        bindConsumer.unbind();
        QueueChannel queueChannel2 = new QueueChannel();
        Binding bindConsumer3 = binder.bindConsumer("retryTest." + currentTimeMillis + ".0", "testGroup", queueChannel2, m1createConsumerProperties);
        String str2 = "test." + UUID.randomUUID().toString();
        directChannel.send(MessageBuilder.withPayload(str2).build());
        Assertions.assertThat(receive(queueChannel2).getPayload()).isEqualTo(str2);
        binderBindUnbindLatency();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testAutoCreateTopicsEnabledSucceeds() throws Exception {
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        createConfigurationProperties.setAutoCreateTopics(true);
        Binder binder = getBinder(createConfigurationProperties);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy());
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000L);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        binder.bindConsumer("nonexisting" + System.currentTimeMillis(), "test", new DirectChannel(), m1createConsumerProperties()).unbind();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testValidateKafkaTopicName() {
        KafkaTopicUtils.validateTopicName("foo:bar");
    }

    @Test
    public void testCompression() throws Exception {
        KafkaProducerProperties.CompressionType[] compressionTypeArr = {KafkaProducerProperties.CompressionType.none, KafkaProducerProperties.CompressionType.gzip, KafkaProducerProperties.CompressionType.snappy};
        byte[] bArr = new byte[2048];
        Arrays.fill(bArr, (byte) 65);
        AbstractTestBinder binder = getBinder();
        for (KafkaProducerProperties.CompressionType compressionType : compressionTypeArr) {
            DirectChannel directChannel = new DirectChannel();
            QueueChannel queueChannel = new QueueChannel();
            ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
            ((KafkaProducerProperties) m0createProducerProperties.getExtension()).setCompressionType(KafkaProducerProperties.CompressionType.valueOf(compressionType.toString()));
            Binding bindProducer = binder.bindProducer("testCompression", directChannel, m0createProducerProperties);
            Binding bindConsumer = binder.bindConsumer("testCompression", "test", queueChannel, m1createConsumerProperties());
            Message build = org.springframework.integration.support.MessageBuilder.withPayload(bArr).build();
            binderBindUnbindLatency();
            directChannel.send(build);
            Message receive = receive(queueChannel);
            Assertions.assertThat(receive).isNotNull();
            Assertions.assertThat((byte[]) receive.getPayload()).containsExactly(bArr);
            bindProducer.unbind();
            bindConsumer.unbind();
        }
    }

    @Test
    public void testCustomPartitionCountOverridesDefaultIfLarger() throws Exception {
        byte[] bArr = new byte[2048];
        Arrays.fill(bArr, (byte) 65);
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        createConfigurationProperties.setMinPartitionCount(10);
        Binder binder = getBinder(createConfigurationProperties);
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionCount(10);
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        long currentTimeMillis = System.currentTimeMillis();
        Binding bindProducer = binder.bindProducer("foo" + currentTimeMillis + ".0", createBindableChannel, m0createProducerProperties);
        Binding bindConsumer = binder.bindConsumer("foo" + currentTimeMillis + ".0", (String) null, queueChannel, m1createConsumerProperties);
        Message build = org.springframework.integration.support.MessageBuilder.withPayload(bArr).build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat((byte[]) receive.getPayload()).containsExactly(bArr);
        Assertions.assertThat(partitionSize("foo" + currentTimeMillis + ".0")).isEqualTo(10);
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testCustomPartitionCountDoesNotOverridePartitioningIfSmaller() throws Exception {
        byte[] bArr = new byte[2048];
        Arrays.fill(bArr, (byte) 65);
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        createConfigurationProperties.setMinPartitionCount(6);
        Binder binder = getBinder(createConfigurationProperties);
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionCount(5);
        m0createProducerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload"));
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        long currentTimeMillis = System.currentTimeMillis();
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        Binding bindProducer = binder.bindProducer("foo" + currentTimeMillis + ".0", createBindableChannel, m0createProducerProperties);
        Binding bindConsumer = binder.bindConsumer("foo" + currentTimeMillis + ".0", (String) null, queueChannel, m1createConsumerProperties);
        Thread.sleep(1000L);
        Message build = org.springframework.integration.support.MessageBuilder.withPayload(bArr).build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat((byte[]) receive.getPayload()).containsExactly(bArr);
        Assertions.assertThat(partitionSize("foo" + currentTimeMillis + ".0")).isEqualTo(6);
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testDynamicKeyExpression() throws Exception {
        Binder binder = getBinder(createConfigurationProperties());
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        ((KafkaProducerProperties) m0createProducerProperties.getExtension()).getConfiguration().put("key.serializer", StringSerializer.class.getName());
        ((KafkaProducerProperties) m0createProducerProperties.getExtension()).setMessageKeyExpression(spelExpressionParser.parseExpression("headers.key"));
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        String uuid = UUID.randomUUID().toString();
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        Binding bindProducer = binder.bindProducer("foo" + uuid + ".0", createBindableChannel, m0createProducerProperties);
        Binding bindConsumer = binder.bindConsumer("foo" + uuid + ".0", (String) null, queueChannel, m1createConsumerProperties);
        Thread.sleep(1000L);
        Message build = MessageBuilder.withPayload("somePayload").setHeader("key", "myDynamicKey").build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getHeaders().get("kafka_receivedMessageKey", byte[].class))).isEqualTo("myDynamicKey");
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testCustomPartitionCountOverridesPartitioningIfLarger() throws Exception {
        byte[] bArr = new byte[2048];
        Arrays.fill(bArr, (byte) 65);
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        createConfigurationProperties.setMinPartitionCount(4);
        Binder binder = getBinder(createConfigurationProperties);
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionCount(5);
        m0createProducerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload"));
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        long currentTimeMillis = System.currentTimeMillis();
        Binding bindProducer = binder.bindProducer("foo" + currentTimeMillis + ".0", createBindableChannel, m0createProducerProperties);
        Binding bindConsumer = binder.bindConsumer("foo" + currentTimeMillis + ".0", (String) null, queueChannel, m1createConsumerProperties);
        Message build = org.springframework.integration.support.MessageBuilder.withPayload(bArr).build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat((byte[]) receive.getPayload()).containsExactly(bArr);
        Assertions.assertThat(partitionSize("foo" + currentTimeMillis + ".0")).isEqualTo(5);
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testDefaultConsumerStartsAtEarliest() throws Exception {
        Binder binder = getBinder(createConfigurationProperties());
        new GenericApplicationContext().refresh();
        DirectChannel directChannel = new DirectChannel();
        QueueChannel queueChannel = new QueueChannel();
        String uuid = UUID.randomUUID().toString();
        Binding bindProducer = binder.bindProducer(uuid, directChannel, m0createProducerProperties());
        String str = "foo-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str.getBytes()));
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        Binding bindConsumer = binder.bindConsumer(uuid, "startOffsets", queueChannel, m1createConsumerProperties);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo(str);
        String str2 = "foo-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str2.getBytes()));
        Message receive2 = receive(queueChannel);
        Assertions.assertThat(receive2).isNotNull();
        Assertions.assertThat(new String((byte[]) receive2.getPayload())).isEqualTo(str2);
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testEarliest() throws Exception {
        Binding binding = null;
        Binding binding2 = null;
        try {
            AbstractTestBinder binder = getBinder();
            DirectChannel directChannel = new DirectChannel();
            QueueChannel queueChannel = new QueueChannel();
            String uuid = UUID.randomUUID().toString();
            binding = binder.bindProducer(uuid, directChannel, m0createProducerProperties());
            directChannel.send(new GenericMessage(("foo-" + UUID.randomUUID().toString()).getBytes()));
            ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
            ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
            ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setStartOffset(KafkaConsumerProperties.StartOffset.earliest);
            binding2 = binder.bindConsumer(uuid, "startOffsets", queueChannel, m1createConsumerProperties);
            Assertions.assertThat(receive(queueChannel)).isNotNull();
            String str = "foo-" + UUID.randomUUID().toString();
            directChannel.send(new GenericMessage(str.getBytes()));
            Message receive = receive(queueChannel);
            Assertions.assertThat(receive).isNotNull();
            Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo(str);
            Thread.sleep(2000L);
            binding.unbind();
            binding2.unbind();
            if (binding2 != null) {
                binding2.unbind();
            }
            if (binding != null) {
                binding.unbind();
            }
        } catch (Throwable th) {
            if (binding2 != null) {
                binding2.unbind();
            }
            if (binding != null) {
                binding.unbind();
            }
            throw th;
        }
    }

    @Test
    public void testResume() throws Exception {
        Binding binding = null;
        Binding binding2 = null;
        try {
            Binder binder = getBinder(createConfigurationProperties());
            DirectChannel directChannel = new DirectChannel();
            QueueChannel queueChannel = new QueueChannel();
            String uuid = UUID.randomUUID().toString();
            binding = binder.bindProducer(uuid, directChannel, m0createProducerProperties());
            directChannel.send(new GenericMessage(("foo1-" + UUID.randomUUID().toString()).getBytes()));
            Binding bindConsumer = binder.bindConsumer(uuid, "startOffsets", queueChannel, m1createConsumerProperties());
            Assertions.assertThat(receive(queueChannel)).isNotNull();
            directChannel.send(new GenericMessage(("foo2-" + UUID.randomUUID().toString()).getBytes()));
            Message receive = receive(queueChannel);
            Assertions.assertThat(receive).isNotNull();
            Assertions.assertThat(new String((byte[]) receive.getPayload())).isNotNull();
            bindConsumer.unbind();
            Thread.sleep(2000L);
            String str = "foo3-" + UUID.randomUUID().toString();
            directChannel.send(new GenericMessage(str.getBytes()));
            binding2 = binder.bindConsumer(uuid, "startOffsets", queueChannel, m1createConsumerProperties());
            Message receive2 = receive(queueChannel);
            Assertions.assertThat(receive2).isNotNull();
            Assertions.assertThat(new String((byte[]) receive2.getPayload())).isEqualTo(str);
            if (binding2 != null) {
                binding2.unbind();
            }
            if (binding != null) {
                binding.unbind();
            }
        } catch (Throwable th) {
            if (binding2 != null) {
                binding2.unbind();
            }
            if (binding != null) {
                binding.unbind();
            }
            throw th;
        }
    }

    @Test
    public void testSendAndReceiveMultipleTopics() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel createBindableChannel = createBindableChannel("output1", createProducerBindingProperties(m0createProducerProperties()));
        DirectChannel createBindableChannel2 = createBindableChannel("output2", createProducerBindingProperties(m0createProducerProperties()));
        QueueChannel queueChannel = new QueueChannel();
        Binding bindProducer = binder.bindProducer("foo.x", createBindableChannel, m0createProducerProperties());
        Binding bindProducer2 = binder.bindProducer("foo.y", createBindableChannel2, m0createProducerProperties());
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        Binding bindConsumer = binder.bindConsumer("foo.x", "test", queueChannel, m1createConsumerProperties);
        Binding bindConsumer2 = binder.bindConsumer("foo.y", "test", queueChannel, m1createConsumerProperties);
        String str = "foo" + UUID.randomUUID().toString();
        Message build = org.springframework.integration.support.MessageBuilder.withPayload(str.getBytes()).build();
        String str2 = "foo" + UUID.randomUUID().toString();
        Message build2 = org.springframework.integration.support.MessageBuilder.withPayload(str2.getBytes()).build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        createBindableChannel2.send(build2);
        Message[] messageArr = {receive(queueChannel), receive(queueChannel)};
        Assertions.assertThat(messageArr[0]).isNotNull();
        Assertions.assertThat(messageArr[1]).isNotNull();
        Assertions.assertThat(messageArr).extracting("payload").containsExactlyInAnyOrder(new Object[]{str.getBytes(), str2.getBytes()});
        bindProducer.unbind();
        bindProducer2.unbind();
        bindConsumer.unbind();
        bindConsumer2.unbind();
    }

    @Test
    public void testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties()));
        QueueChannel queueChannel = new QueueChannel();
        Binding bindProducer = binder.bindProducer("testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff", createBindableChannel, m0createProducerProperties());
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoCommitOffset(false);
        Binding bindConsumer = binder.bindConsumer("testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff", "test", queueChannel, m1createConsumerProperties);
        Message build = org.springframework.integration.support.MessageBuilder.withPayload(("foo" + UUID.randomUUID().toString()).getBytes()).build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getHeaders().get("kafka_acknowledgment")).isNotNull();
        try {
            try {
                ((Acknowledgment) receive.getHeaders().get("kafka_acknowledgment", Acknowledgment.class)).acknowledge();
                bindProducer.unbind();
                bindConsumer.unbind();
            } catch (Exception e) {
                Assertions.fail("Acknowledge must not throw an exception");
                bindProducer.unbind();
                bindConsumer.unbind();
            }
        } catch (Throwable th) {
            bindProducer.unbind();
            bindConsumer.unbind();
            throw th;
        }
    }

    @Test
    public void testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties()));
        QueueChannel queueChannel = new QueueChannel();
        Binding bindProducer = binder.bindProducer("testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", createBindableChannel, m0createProducerProperties());
        Binding bindConsumer = binder.bindConsumer("testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", "test", queueChannel, m1createConsumerProperties());
        Message build = org.springframework.integration.support.MessageBuilder.withPayload(("foo" + UUID.randomUUID().toString()).getBytes()).build();
        binderBindUnbindLatency();
        createBindableChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(receive.getHeaders().get("kafka_acknowledgment")).isNull();
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testTwoRequiredGroups() throws Exception {
        AbstractTestBinder binder = getBinder();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        String str = "testDestination" + UUID.randomUUID().toString().replace("-", "");
        m0createProducerProperties.setRequiredGroups(new String[]{"test1", "test2"});
        Binding bindProducer = binder.bindProducer(str, createBindableChannel, m0createProducerProperties);
        String str2 = "foo-" + UUID.randomUUID().toString();
        createBindableChannel.send(new GenericMessage(str2.getBytes()));
        QueueChannel queueChannel = new QueueChannel();
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        Binding bindConsumer = binder.bindConsumer(str, "test1", queueChannel, m1createConsumerProperties);
        QueueChannel queueChannel2 = new QueueChannel();
        Binding bindConsumer2 = binder.bindConsumer(str, "test2", queueChannel2, m1createConsumerProperties);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo(str2);
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Assertions.assertThat(new String((byte[]) receive2.getPayload())).isEqualTo(str2);
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testPartitionedModuleSpEL() throws Exception {
        AbstractTestBinder binder = getBinder();
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setConcurrency(2);
        m1createConsumerProperties.setInstanceIndex(0);
        m1createConsumerProperties.setInstanceCount(3);
        m1createConsumerProperties.setPartitioned(true);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0S");
        Binding bindConsumer = binder.bindConsumer("part.0", "test", queueChannel, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1S");
        Binding bindConsumer2 = binder.bindConsumer("part.0", "test", queueChannel2, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2S");
        Binding bindConsumer3 = binder.bindConsumer("part.0", "test", queueChannel3, m1createConsumerProperties);
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload"));
        m0createProducerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
        m0createProducerProperties.setPartitionCount(3);
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        createBindableChannel.setBeanName("test.output");
        Binding bindProducer = binder.bindProducer("part.0", createBindableChannel, m0createProducerProperties);
        try {
            Assertions.assertThat(getEndpointRouting(extractEndpoint(bindProducer))).contains(new CharSequence[]{getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']"});
        } catch (UnsupportedOperationException e) {
        }
        createBindableChannel.send(org.springframework.integration.support.MessageBuilder.withPayload(2).setHeader("correlationId", "foo").setHeader("sequenceNumber", 42).setHeader("sequenceSize", 43).build());
        createBindableChannel.send(new GenericMessage(1));
        createBindableChannel.send(new GenericMessage(0));
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Message receive3 = receive(queueChannel3);
        Assertions.assertThat(receive3).isNotNull();
        Condition<Message<?>> condition = new Condition<Message<?>>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaBinderTests.3
            public boolean matches(Message<?> message) {
                IntegrationMessageHeaderAccessor integrationMessageHeaderAccessor = new IntegrationMessageHeaderAccessor(message);
                return "foo".equals(integrationMessageHeaderAccessor.getCorrelationId()) && 42 == integrationMessageHeaderAccessor.getSequenceNumber().intValue() && 43 == integrationMessageHeaderAccessor.getSequenceSize().intValue();
            }
        };
        if (usesExplicitRouting()) {
            Assertions.assertThat(receive.getPayload()).isEqualTo(0);
            Assertions.assertThat(receive2.getPayload()).isEqualTo(1);
            Assertions.assertThat(receive3.getPayload()).isEqualTo(2);
            Assertions.assertThat(receive3).has(condition);
        } else {
            List asList = Arrays.asList(receive, receive2, receive3);
            Assertions.assertThat(asList).extracting("payload").containsExactlyInAnyOrder(new Object[]{0, 1, 2});
            Assertions.assertThat(asList).filteredOn(new Condition<Message<?>>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaBinderTests.4
                public boolean matches(Message<?> message) {
                    return message.getPayload().equals(2);
                }
            }).areExactly(1, condition);
        }
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testPartitionedModuleJava() throws Exception {
        AbstractTestBinder binder = getBinder();
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        invokeCreateTopic(new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false), "partJ.0", 8, 1, new Properties());
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setConcurrency(2);
        m1createConsumerProperties.setInstanceCount(4);
        m1createConsumerProperties.setInstanceIndex(0);
        m1createConsumerProperties.setPartitioned(true);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0J");
        Binding bindConsumer = binder.bindConsumer("partJ.0", "test", queueChannel, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1J");
        Binding bindConsumer2 = binder.bindConsumer("partJ.0", "test", queueChannel2, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2J");
        Binding bindConsumer3 = binder.bindConsumer("partJ.0", "test", queueChannel3, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(3);
        QueueChannel queueChannel4 = new QueueChannel();
        queueChannel4.setBeanName("test.input3J");
        Binding bindConsumer4 = binder.bindConsumer("partJ.0", "test", queueChannel4, m1createConsumerProperties);
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
        m0createProducerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
        m0createProducerProperties.setPartitionCount(3);
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        createBindableChannel.setBeanName("test.output");
        Binding bindProducer = binder.bindProducer("partJ.0", createBindableChannel, m0createProducerProperties);
        if (usesExplicitRouting()) {
            Assertions.assertThat(getEndpointRouting(extractEndpoint(bindProducer))).contains(new CharSequence[]{getExpectedRoutingBaseDestination("partJ.0", "test") + "-' + headers['partition']"});
        }
        createBindableChannel.send(new GenericMessage(2));
        createBindableChannel.send(new GenericMessage(1));
        createBindableChannel.send(new GenericMessage(0));
        createBindableChannel.send(new GenericMessage(3));
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Message receive3 = receive(queueChannel3);
        Assertions.assertThat(receive3).isNotNull();
        Message receive4 = receive(queueChannel4);
        Assertions.assertThat(receive4).isNotNull();
        Assertions.assertThat(receive.getPayload()).isEqualTo(0);
        Assertions.assertThat(receive2.getPayload()).isEqualTo(1);
        Assertions.assertThat(receive3.getPayload()).isEqualTo(2);
        Assertions.assertThat(receive4.getPayload()).isEqualTo(3);
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindConsumer4.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testAnonymousGroup() throws Exception {
        AbstractTestBinder binder = getBinder();
        BindingProperties createProducerBindingProperties = createProducerBindingProperties(m0createProducerProperties());
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties);
        Binding bindProducer = binder.bindProducer("defaultGroup.0", createBindableChannel, createProducerBindingProperties.getProducer());
        QueueChannel queueChannel = new QueueChannel();
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        Binding bindConsumer = binder.bindConsumer("defaultGroup.0", (String) null, queueChannel, m1createConsumerProperties);
        QueueChannel queueChannel2 = new QueueChannel();
        Binding bindConsumer2 = binder.bindConsumer("defaultGroup.0", (String) null, queueChannel2, m1createConsumerProperties);
        Thread.sleep(1000L);
        String str = "foo-" + UUID.randomUUID().toString();
        createBindableChannel.send(new GenericMessage(str.getBytes()));
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo(str);
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Assertions.assertThat(new String((byte[]) receive2.getPayload())).isEqualTo(str);
        bindConsumer2.unbind();
        String str2 = "foo-" + UUID.randomUUID().toString();
        createBindableChannel.send(new GenericMessage(str2.getBytes()));
        Binding bindConsumer3 = binder.bindConsumer("defaultGroup.0", (String) null, queueChannel2, m1createConsumerProperties);
        Thread.sleep(1000L);
        String str3 = "foo-" + UUID.randomUUID().toString();
        createBindableChannel.send(new GenericMessage(str3.getBytes()));
        Message receive3 = receive(queueChannel);
        Assertions.assertThat(receive3).isNotNull();
        Assertions.assertThat(new String((byte[]) receive3.getPayload())).isEqualTo(str2);
        Message receive4 = receive(queueChannel);
        Assertions.assertThat(receive4).isNotNull();
        Assertions.assertThat(new String((byte[]) receive4.getPayload())).isNotNull();
        Message receive5 = receive(queueChannel2);
        Assertions.assertThat(receive5).isNotNull();
        Assertions.assertThat(new String((byte[]) receive5.getPayload())).isEqualTo(str3);
        bindProducer.unbind();
        bindConsumer.unbind();
        bindConsumer3.unbind();
    }

    @Test
    public void testSyncProducerMetadata() throws Exception {
        Binder binder = getBinder(createConfigurationProperties());
        DirectChannel directChannel = new DirectChannel();
        String uuid = UUID.randomUUID().toString();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        ((KafkaProducerProperties) m0createProducerProperties.getExtension()).setSync(true);
        Binding bindProducer = binder.bindProducer(uuid, directChannel, m0createProducerProperties);
        Assertions.assertThat(new DirectFieldAccessor((KafkaProducerMessageHandler) new DirectFieldAccessor(extractEndpoint(bindProducer)).getWrappedInstance()).getPropertyValue("sync").equals(Boolean.TRUE)).withFailMessage("Kafka Sync Producer should have been enabled.", new Object[0]);
        bindProducer.unbind();
    }

    @Test
    public void testAutoCreateTopicsDisabledOnBinderStillWorksAsLongAsBrokerCreatesTopic() throws Exception {
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        createConfigurationProperties.setAutoCreateTopics(false);
        Binder binder = getBinder(createConfigurationProperties);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy());
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000L);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        DirectChannel directChannel = new DirectChannel();
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        String str = "createdByBroker-" + System.currentTimeMillis();
        QueueChannel queueChannel = new QueueChannel();
        Binding bindProducer = binder.bindProducer(str, directChannel, m0createProducerProperties());
        String str2 = "foo1-" + UUID.randomUUID().toString();
        directChannel.send(new GenericMessage(str2.getBytes()));
        Binding bindConsumer = binder.bindConsumer(str, "test", queueChannel, m1createConsumerProperties);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo(str2);
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting() throws Exception {
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        ZkUtils zkUtils = getZkUtils(createConfigurationProperties);
        String str = "existing" + System.currentTimeMillis();
        invokeCreateTopic(zkUtils, str, 5, 1, new Properties());
        createConfigurationProperties.setAutoCreateTopics(false);
        getBinder(createConfigurationProperties).bindConsumer(str, "test", new DirectChannel(), m1createConsumerProperties()).unbind();
    }

    @Test
    public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Exception {
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        ZkUtils zkUtils = getZkUtils(createConfigurationProperties);
        String str = "existing" + System.currentTimeMillis();
        invokeCreateTopic(zkUtils, str, 6, 1, new Properties());
        createConfigurationProperties.setMinPartitionCount(6);
        createConfigurationProperties.setAutoAddPartitions(true);
        Binder binder = getBinder(createConfigurationProperties);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy());
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000L);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        binder.bindConsumer(str, "test", new DirectChannel(), m1createConsumerProperties()).unbind();
        Assertions.assertThat(invokePartitionSize(str, zkUtils)).isEqualTo(6);
    }

    @Test
    public void testAutoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanceEnabled() throws Exception {
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        ZkUtils zkUtils = new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false);
        String str = "existing" + System.currentTimeMillis();
        invokeCreateTopic(zkUtils, str, 1, 1, new Properties());
        createConfigurationProperties.setAutoAddPartitions(false);
        Binder binder = getBinder(createConfigurationProperties);
        new GenericApplicationContext().refresh();
        DirectChannel directChannel = new DirectChannel();
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setInstanceCount(3);
        m1createConsumerProperties.setInstanceIndex(2);
        binder.bindConsumer(str, "test", directChannel, m1createConsumerProperties).unbind();
        Assertions.assertThat(invokePartitionSize(str, zkUtils)).isEqualTo(1);
    }

    @Test
    public void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled() throws Exception {
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        ZkUtils zkUtils = new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false);
        String str = "existing" + System.currentTimeMillis();
        invokeCreateTopic(zkUtils, str, 1, 1, new Properties());
        createConfigurationProperties.setAutoAddPartitions(false);
        Binder binder = getBinder(createConfigurationProperties);
        new GenericApplicationContext().refresh();
        DirectChannel directChannel = new DirectChannel();
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setInstanceCount(3);
        m1createConsumerProperties.setInstanceIndex(2);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        this.expectedProvisioningException.expect(ProvisioningException.class);
        this.expectedProvisioningException.expectMessage("The number of expected partitions was: 3, but 1 has been found instead");
        Binding bindConsumer = binder.bindConsumer(str, "test", directChannel, m1createConsumerProperties);
        if (bindConsumer != null) {
            bindConsumer.unbind();
        }
    }

    @Test
    public void testAutoAddPartitionsDisabledSucceedsIfTopicPartitionedCorrectly() throws Exception {
        Binding binding = null;
        try {
            KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
            ZkUtils zkUtils = new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false);
            String str = "existing" + System.currentTimeMillis();
            invokeCreateTopic(zkUtils, str, 6, 1, new Properties());
            createConfigurationProperties.setAutoAddPartitions(false);
            Binder binder = getBinder(createConfigurationProperties);
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setRetryPolicy(new SimpleRetryPolicy());
            FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
            fixedBackOffPolicy.setBackOffPeriod(1000L);
            retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
            DirectChannel directChannel = new DirectChannel();
            ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
            m1createConsumerProperties.setInstanceCount(3);
            m1createConsumerProperties.setInstanceIndex(2);
            ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
            binding = binder.bindConsumer(str, "test-x", directChannel, m1createConsumerProperties);
            TopicPartitionInitialOffset[] topicPartitionInitialOffsetArr = (TopicPartitionInitialOffset[]) TestUtils.getPropertyValue(binding, "lifecycle.messageListenerContainer.containerProperties.topicPartitions", TopicPartitionInitialOffset[].class);
            Assertions.assertThat(topicPartitionInitialOffsetArr).hasSize(2);
            Assertions.assertThat(topicPartitionInitialOffsetArr).contains(new TopicPartitionInitialOffset[]{new TopicPartitionInitialOffset(str, 2), new TopicPartitionInitialOffset(str, 5)});
            Assertions.assertThat(invokePartitionSize(str, zkUtils)).isEqualTo(6);
            if (binding != null) {
                binding.unbind();
            }
        } catch (Throwable th) {
            if (binding != null) {
                binding.unbind();
            }
            throw th;
        }
    }

    @Test
    public void testPartitionCountNotReduced() throws Exception {
        String str = "existing" + System.currentTimeMillis();
        KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
        invokeCreateTopic(new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false), str, 6, 1, new Properties());
        createConfigurationProperties.setAutoAddPartitions(true);
        Binder binder = getBinder(createConfigurationProperties);
        new GenericApplicationContext().refresh();
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy());
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000L);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        binder.bindConsumer(str, "test", new DirectChannel(), m1createConsumerProperties()).unbind();
        Assertions.assertThat(partitionSize(str)).isEqualTo(6);
    }

    @Test
    public void testConsumerDefaultDeserializer() throws Exception {
        Binding binding = null;
        try {
            KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
            ZkUtils zkUtils = getZkUtils(createConfigurationProperties);
            String str = "existing" + System.currentTimeMillis();
            invokeCreateTopic(zkUtils, str, 5, 1, new Properties());
            createConfigurationProperties.setAutoCreateTopics(false);
            binding = getBinder(createConfigurationProperties).bindConsumer(str, "test", new DirectChannel(), m1createConsumerProperties());
            DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
            Assert.assertTrue(directFieldAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer);
            Assert.assertTrue(directFieldAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer);
            if (binding != null) {
                binding.unbind();
            }
        } catch (Throwable th) {
            if (binding != null) {
                binding.unbind();
            }
            throw th;
        }
    }

    @Test
    public void testConsumerCustomDeserializer() throws Exception {
        Binding binding = null;
        try {
            KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
            Map configuration = createConfigurationProperties.getConfiguration();
            configuration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            configuration.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
            createConfigurationProperties.setConfiguration(configuration);
            ZkUtils zkUtils = getZkUtils(createConfigurationProperties);
            String str = "existing" + System.currentTimeMillis();
            invokeCreateTopic(zkUtils, str, 5, 1, new Properties());
            createConfigurationProperties.setAutoCreateTopics(false);
            binding = getBinder(createConfigurationProperties).bindConsumer(str, "test", new DirectChannel(), m1createConsumerProperties());
            DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
            Assert.assertTrue("Expected StringDeserializer as a custom key deserializer", directFieldAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer);
            Assert.assertTrue("Expected LongDeserializer as a custom value deserializer", directFieldAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer);
            if (binding != null) {
                binding.unbind();
            }
        } catch (Throwable th) {
            if (binding != null) {
                binding.unbind();
            }
            throw th;
        }
    }

    private KafkaConsumer getKafkaConsumer(Binding binding) {
        return ((DefaultKafkaConsumerFactory) new DirectFieldAccessor((ConcurrentMessageListenerContainer) new DirectFieldAccessor((KafkaMessageDrivenChannelAdapter) new DirectFieldAccessor(binding).getPropertyValue("lifecycle")).getPropertyValue("messageListenerContainer")).getPropertyValue("consumerFactory")).createConsumer();
    }

    @Test
    public void testNativeSerializationWithCustomSerializerDeserializer() throws Exception {
        Binding binding = null;
        Binding binding2 = null;
        try {
            Message build = MessageBuilder.withPayload(new Integer(10)).build();
            DirectChannel directChannel = new DirectChannel();
            String str = "existing" + System.currentTimeMillis();
            KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
            invokeCreateTopic(new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false), str, 6, 1, new Properties());
            createConfigurationProperties.setAutoAddPartitions(true);
            Binder binder = getBinder(createConfigurationProperties);
            QueueChannel queueChannel = new QueueChannel();
            ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
            m0createProducerProperties.setUseNativeEncoding(true);
            ((KafkaProducerProperties) m0createProducerProperties.getExtension()).getConfiguration().put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            binding = binder.bindProducer(str, directChannel, m0createProducerProperties);
            ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
            ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
            ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).getConfiguration().put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            binding2 = binder.bindConsumer(str, "test", queueChannel, m1createConsumerProperties);
            binderBindUnbindLatency();
            directChannel.send(build);
            Message receive = receive(queueChannel, 500);
            Assertions.assertThat(receive).isNotNull();
            Assertions.assertThat(receive.getPayload()).isEqualTo(10);
            Assertions.assertThat(receive.getHeaders()).doesNotContainKey("contentType");
            if (binding != null) {
                binding.unbind();
            }
            if (binding2 != null) {
                binding2.unbind();
            }
        } catch (Throwable th) {
            if (binding != null) {
                binding.unbind();
            }
            if (binding2 != null) {
                binding2.unbind();
            }
            throw th;
        }
    }

    @Test
    public void testBuiltinSerialization() throws Exception {
        Binding binding = null;
        Binding binding2 = null;
        try {
            Message build = MessageBuilder.withPayload(new String("test")).build();
            DirectChannel directChannel = new DirectChannel();
            String str = "existing" + System.currentTimeMillis();
            KafkaBinderConfigurationProperties createConfigurationProperties = createConfigurationProperties();
            invokeCreateTopic(new ZkUtils(new ZkClient(createConfigurationProperties.getZkConnectionString(), createConfigurationProperties.getZkSessionTimeout(), createConfigurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$), (ZkConnection) null, false), str, 6, 1, new Properties());
            createConfigurationProperties.setAutoAddPartitions(true);
            Binder binder = getBinder(createConfigurationProperties);
            QueueChannel queueChannel = new QueueChannel();
            binding = binder.bindProducer(str, directChannel, m0createProducerProperties());
            ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
            ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
            binding2 = binder.bindConsumer(str, "test", queueChannel, m1createConsumerProperties);
            binderBindUnbindLatency();
            directChannel.send(build);
            Message receive = receive(queueChannel, 5);
            Assertions.assertThat(receive).isNotNull();
            Assertions.assertThat(receive.getPayload()).isEqualTo("test");
            Assertions.assertThat(receive.getHeaders()).containsEntry("contentType", "text/plain");
            if (binding != null) {
                binding.unbind();
            }
            if (binding2 != null) {
                binding2.unbind();
            }
        } catch (Throwable th) {
            if (binding != null) {
                binding.unbind();
            }
            if (binding2 != null) {
                binding2.unbind();
            }
            throw th;
        }
    }

    @Test
    public void testPartitionedModuleJavaWithRawMode() throws Exception {
        AbstractTestBinder binder = getBinder();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setHeaderMode(HeaderMode.raw);
        m0createProducerProperties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
        m0createProducerProperties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
        m0createProducerProperties.setPartitionCount(6);
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        createBindableChannel.setBeanName("test.output");
        Binding bindProducer = binder.bindProducer("partJ.raw.0", createBindableChannel, m0createProducerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setConcurrency(2);
        m1createConsumerProperties.setInstanceCount(3);
        m1createConsumerProperties.setInstanceIndex(0);
        m1createConsumerProperties.setPartitioned(true);
        m1createConsumerProperties.setHeaderMode(HeaderMode.raw);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0J");
        Binding bindConsumer = binder.bindConsumer("partJ.raw.0", "test", queueChannel, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1J");
        Binding bindConsumer2 = binder.bindConsumer("partJ.raw.0", "test", queueChannel2, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2J");
        Binding bindConsumer3 = binder.bindConsumer("partJ.raw.0", "test", queueChannel3, m1createConsumerProperties);
        createBindableChannel.send(new GenericMessage(new byte[]{0}));
        createBindableChannel.send(new GenericMessage(new byte[]{1}));
        createBindableChannel.send(new GenericMessage(new byte[]{2}));
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Message receive3 = receive(queueChannel3);
        Assertions.assertThat(receive3).isNotNull();
        Assertions.assertThat(Arrays.asList(Byte.valueOf(((byte[]) receive.getPayload())[0]), Byte.valueOf(((byte[]) receive2.getPayload())[0]), Byte.valueOf(((byte[]) receive3.getPayload())[0]))).containsExactlyInAnyOrder(new Byte[]{(byte) 0, (byte) 1, (byte) 2});
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testPartitionedModuleSpELWithRawMode() throws Exception {
        AbstractTestBinder binder = getBinder();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
        m0createProducerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
        m0createProducerProperties.setPartitionCount(6);
        m0createProducerProperties.setHeaderMode(HeaderMode.raw);
        DirectChannel createBindableChannel = createBindableChannel("output", createProducerBindingProperties(m0createProducerProperties));
        createBindableChannel.setBeanName("test.output");
        Binding bindProducer = binder.bindProducer("part.raw.0", createBindableChannel, m0createProducerProperties);
        try {
            Assertions.assertThat(getEndpointRouting(extractEndpoint(bindProducer))).contains(new CharSequence[]{getExpectedRoutingBaseDestination("part.raw.0", "test") + "-' + headers['partition']"});
        } catch (UnsupportedOperationException e) {
        }
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setConcurrency(2);
        m1createConsumerProperties.setInstanceIndex(0);
        m1createConsumerProperties.setInstanceCount(3);
        m1createConsumerProperties.setPartitioned(true);
        m1createConsumerProperties.setHeaderMode(HeaderMode.raw);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0S");
        Binding bindConsumer = binder.bindConsumer("part.raw.0", "test", queueChannel, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(1);
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1S");
        Binding bindConsumer2 = binder.bindConsumer("part.raw.0", "test", queueChannel2, m1createConsumerProperties);
        m1createConsumerProperties.setInstanceIndex(2);
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2S");
        Binding bindConsumer3 = binder.bindConsumer("part.raw.0", "test", queueChannel3, m1createConsumerProperties);
        createBindableChannel.send(org.springframework.integration.support.MessageBuilder.withPayload(new byte[]{2}).setHeader("correlationId", "kafkaBinderTestCommonsDelegate").setHeader("sequenceNumber", 42).setHeader("sequenceSize", 43).build());
        createBindableChannel.send(new GenericMessage(new byte[]{1}));
        createBindableChannel.send(new GenericMessage(new byte[]{0}));
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Message receive2 = receive(queueChannel2);
        Assertions.assertThat(receive2).isNotNull();
        Message receive3 = receive(queueChannel3);
        Assertions.assertThat(receive3).isNotNull();
        Assertions.assertThat(Arrays.asList(Byte.valueOf(((byte[]) receive.getPayload())[0]), Byte.valueOf(((byte[]) receive2.getPayload())[0]), Byte.valueOf(((byte[]) receive3.getPayload())[0]))).containsExactlyInAnyOrder(new Byte[]{(byte) 0, (byte) 1, (byte) 2});
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer3.unbind();
        bindProducer.unbind();
    }

    @Test
    public void testSendAndReceiveWithRawMode() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindProducer = binder.bindProducer("raw.0", directChannel, m0createProducerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindConsumer = binder.bindConsumer("raw.0", "test", queueChannel, m1createConsumerProperties);
        Message build = org.springframework.integration.support.MessageBuilder.withPayload("testSendAndReceiveWithRawMode".getBytes()).build();
        binderBindUnbindLatency();
        directChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo("testSendAndReceiveWithRawMode");
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendAndReceiveWithRawModeAndStringPayload() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        QueueChannel queueChannel = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindProducer = binder.bindProducer("raw.string.0", directChannel, m0createProducerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindConsumer = binder.bindConsumer("raw.string.0", "test", queueChannel, m1createConsumerProperties);
        Message build = org.springframework.integration.support.MessageBuilder.withPayload("testSendAndReceiveWithRawModeAndStringPayload").build();
        binderBindUnbindLatency();
        directChannel.send(build);
        Message receive = receive(queueChannel);
        Assertions.assertThat(receive).isNotNull();
        Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo("testSendAndReceiveWithRawModeAndStringPayload");
        bindProducer.unbind();
        bindConsumer.unbind();
    }

    @Test
    public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() throws Exception {
        AbstractTestBinder binder = getBinder();
        DirectChannel directChannel = new DirectChannel();
        QueueChannel queueChannel = new QueueChannel();
        QueueChannel queueChannel2 = new QueueChannel();
        QueueChannel queueChannel3 = new QueueChannel();
        ExtendedProducerProperties<KafkaProducerProperties> m0createProducerProperties = m0createProducerProperties();
        m0createProducerProperties.setHeaderMode(HeaderMode.raw);
        Binding bindProducer = binder.bindProducer("baz.raw.0", directChannel, m0createProducerProperties);
        ExtendedConsumerProperties<KafkaConsumerProperties> m1createConsumerProperties = m1createConsumerProperties();
        m1createConsumerProperties.setHeaderMode(HeaderMode.raw);
        ((KafkaConsumerProperties) m1createConsumerProperties.getExtension()).setAutoRebalanceEnabled(false);
        Binding bindConsumer = binder.bindConsumer("baz.raw.0", "test", queueChannel, m1createConsumerProperties);
        Binding bindConsumer2 = binder.bindConsumer("baz.raw.0", "tap1", queueChannel2, m1createConsumerProperties);
        Binding bindConsumer3 = binder.bindConsumer("baz.raw.0", "tap2", queueChannel3, m1createConsumerProperties);
        Message build = org.springframework.integration.support.MessageBuilder.withPayload("testSendAndReceiveWithExplicitConsumerGroupWithRawMode".getBytes()).build();
        boolean z = false;
        boolean z2 = false;
        while (!z) {
            directChannel.send(build);
            Message receive = receive(queueChannel);
            Assertions.assertThat(receive).isNotNull();
            Assertions.assertThat(new String((byte[]) receive.getPayload())).isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
            Message receive2 = receive(queueChannel2);
            Message receive3 = receive(queueChannel3);
            if (receive2 == null || receive3 == null) {
                Assertions.assertThat(z2).isFalse().withFailMessage("Failed to receive tap after retry", new Object[0]);
                z2 = true;
            } else {
                z = true;
                Assertions.assertThat(new String((byte[]) receive2.getPayload())).isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
                Assertions.assertThat(new String((byte[]) receive3.getPayload())).isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
            }
        }
        bindConsumer3.unbind();
        directChannel.send(org.springframework.integration.support.MessageBuilder.withPayload("bar".getBytes()).build());
        Assertions.assertThat(receive(queueChannel2)).isNotNull();
        Assertions.assertThat(receive(queueChannel3)).isNull();
        Binding bindConsumer4 = binder.bindConsumer("baz.raw.0", "tap2", queueChannel3, m1createConsumerProperties());
        Assertions.assertThat(receive(queueChannel3)).isNotNull();
        bindConsumer.unbind();
        bindConsumer2.unbind();
        bindConsumer4.unbind();
        bindProducer.unbind();
        Assertions.assertThat(extractEndpoint(bindConsumer).isRunning()).isFalse();
        Assertions.assertThat(extractEndpoint(bindConsumer2).isRunning()).isFalse();
        Assertions.assertThat(extractEndpoint(bindConsumer4).isRunning()).isFalse();
        Assertions.assertThat(extractEndpoint(bindProducer).isRunning()).isFalse();
    }

    @Test
    public void testProducerErrorChannel() throws Exception {
        AbstractKafkaTestBinder abstractKafkaTestBinder = (AbstractKafkaTestBinder) getBinder();
        DirectChannel createBindableChannel = createBindableChannel("output", new BindingProperties());
        ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties(new KafkaProducerProperties());
        extendedProducerProperties.setHeaderMode(HeaderMode.raw);
        extendedProducerProperties.setErrorChannelEnabled(true);
        Binding bindProducer = abstractKafkaTestBinder.bindProducer("ec.0", createBindableChannel, extendedProducerProperties);
        Message build = MessageBuilder.withPayload("bad").setHeader("contentType", "foo/bar").build();
        SubscribableChannel subscribableChannel = (SubscribableChannel) abstractKafkaTestBinder.getApplicationContext().getBean("ec.0.errors", SubscribableChannel.class);
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        subscribableChannel.subscribe(new MessageHandler() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaBinderTests.5
            public void handleMessage(Message<?> message) throws MessagingException {
                atomicReference.set(message);
                countDownLatch.countDown();
            }
        });
        ((SubscribableChannel) abstractKafkaTestBinder.getApplicationContext().getBean("errorChannel", SubscribableChannel.class)).subscribe(new MessageHandler() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaBinderTests.6
            public void handleMessage(Message<?> message) throws MessagingException {
                countDownLatch.countDown();
            }
        });
        KafkaProducerMessageHandler kafkaProducerMessageHandler = (KafkaProducerMessageHandler) TestUtils.getPropertyValue(bindProducer, "lifecycle", KafkaProducerMessageHandler.class);
        final RuntimeException runtimeException = new RuntimeException("foo");
        final AtomicReference atomicReference2 = new AtomicReference();
        new DirectFieldAccessor(kafkaProducerMessageHandler).setPropertyValue("kafkaTemplate", new KafkaTemplate((ProducerFactory) Mockito.mock(ProducerFactory.class)) { // from class: org.springframework.cloud.stream.binder.kafka.KafkaBinderTests.7
            public ListenableFuture<SendResult> send(String str, Object obj) {
                atomicReference2.set(obj);
                SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
                settableListenableFuture.setException(runtimeException);
                return settableListenableFuture;
            }
        });
        createBindableChannel.send(build);
        Assertions.assertThat(countDownLatch.await(10L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(atomicReference.get()).isInstanceOf(ErrorMessage.class);
        Assertions.assertThat(((Message) atomicReference.get()).getPayload()).isInstanceOf(KafkaSendFailureException.class);
        KafkaSendFailureException kafkaSendFailureException = (KafkaSendFailureException) ((Message) atomicReference.get()).getPayload();
        Assertions.assertThat(kafkaSendFailureException.getCause()).isSameAs(runtimeException);
        Assertions.assertThat(new String((byte[]) kafkaSendFailureException.getFailedMessage().getPayload())).isEqualTo(build.getPayload());
        Assertions.assertThat(kafkaSendFailureException.getRecord().value()).isSameAs(atomicReference2.get());
        bindProducer.unbind();
    }

    protected void binderBindUnbindLatency() throws InterruptedException {
        Thread.sleep(500L);
    }
}
