package org.springframework.cloud.stream.binder;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:org/springframework/cloud/stream/binder/PartitionCapableBinderTests.class */
public abstract class PartitionCapableBinderTests extends BrokerBinderTests {
    @Test
    public void testBadProperties() throws Exception {
        Binder binder = getBinder();
        Properties properties = new Properties();
        properties.put("foo", "bar");
        properties.put("baz", "qux");
        DirectChannel directChannel = new DirectChannel();
        try {
            binder.bindProducer("badprops.0", directChannel, properties);
        } catch (IllegalArgumentException e) {
            Assert.assertThat(e.getMessage(), Matchers.allOf(Matchers.containsString(getClassUnderTestName() + " does not support producer "), Matchers.containsString("foo"), Matchers.containsString("baz"), Matchers.containsString(" for badprops.0.")));
        }
        properties.remove("baz");
        try {
            binder.bindConsumer("badprops.0", directChannel, properties);
        } catch (IllegalArgumentException e2) {
            Assert.assertThat(e2.getMessage(), Matchers.equalTo(getClassUnderTestName() + " does not support consumer property: foo for badprops.0."));
        }
    }

    @Test
    public void testPartitionedModuleSpEL() throws Exception {
        Binder binder = getBinder();
        Properties properties = new Properties();
        properties.put("partitionKeyExpression", "payload");
        properties.put("partitionSelectorExpression", "hashCode()");
        properties.put("nextModuleCount", "3");
        properties.put("nextModuleConcurrency", "2");
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName("test.output");
        binder.bindProducer("part.0", directChannel, properties);
        List list = (List) TestUtils.getPropertyValue(binder, "binder.bindings", List.class);
        Assert.assertEquals(1L, list.size());
        try {
            Assert.assertThat(getEndpointRouting(((Binding) list.get(0)).getEndpoint()), Matchers.containsString("part.0-' + headers['partition']"));
        } catch (UnsupportedOperationException e) {
        }
        properties.clear();
        properties.put("concurrency", "2");
        properties.put("partitionIndex", "0");
        properties.put("count", "3");
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0S");
        binder.bindConsumer("part.0", queueChannel, properties);
        properties.put("partitionIndex", "1");
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1S");
        binder.bindConsumer("part.0", queueChannel2, properties);
        properties.put("partitionIndex", "2");
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2S");
        binder.bindConsumer("part.0", queueChannel3, properties);
        directChannel.send(MessageBuilder.withPayload(2).setHeader("correlationId", "foo").setHeader("sequenceNumber", 42).setHeader("sequenceSize", 43).setHeader("binderReplyChannel", "bar").build());
        directChannel.send(new GenericMessage(1));
        directChannel.send(new GenericMessage(0));
        Message receive = queueChannel.receive(1000L);
        Assert.assertNotNull(receive);
        Message receive2 = queueChannel2.receive(1000L);
        Assert.assertNotNull(receive2);
        Message receive3 = queueChannel3.receive(1000L);
        Assert.assertNotNull(receive3);
        Matcher matcher = new CustomMatcher<Message<?>>("the message with 'foo' as its correlationId") { // from class: org.springframework.cloud.stream.binder.PartitionCapableBinderTests.1
            public boolean matches(Object obj) {
                IntegrationMessageHeaderAccessor integrationMessageHeaderAccessor = new IntegrationMessageHeaderAccessor((Message) obj);
                return "foo".equals(integrationMessageHeaderAccessor.getCorrelationId()) && 42 == integrationMessageHeaderAccessor.getSequenceNumber().intValue() && 43 == integrationMessageHeaderAccessor.getSequenceSize().intValue() && "bar".equals(integrationMessageHeaderAccessor.getHeader("binderReplyChannel"));
            }
        };
        if (usesExplicitRouting()) {
            Assert.assertEquals(0, receive.getPayload());
            Assert.assertEquals(1, receive2.getPayload());
            Assert.assertEquals(2, receive3.getPayload());
            Assert.assertThat(receive3, matcher);
        } else {
            Assert.assertThat(Arrays.asList((Integer) receive.getPayload(), (Integer) receive2.getPayload(), (Integer) receive3.getPayload()), Matchers.containsInAnyOrder(new Integer[]{0, 1, 2}));
            Assert.assertThat(Arrays.asList(receive, receive2, receive3), Matchers.containsInAnyOrder(new Matcher[]{matcher, Matchers.hasProperty("payload", Matchers.equalTo(0)), Matchers.hasProperty("payload", Matchers.equalTo(1))}));
        }
        binder.unbindConsumers("part.0");
        binder.unbindProducers("part.0");
    }

    @Test
    public void testPartitionedModuleJava() throws Exception {
        Binder binder = getBinder();
        Properties properties = new Properties();
        properties.put("partitionKeyExtractorClass", "org.springframework.cloud.stream.binder.PartitionTestSupport");
        properties.put("partitionSelectorClass", "org.springframework.cloud.stream.binder.PartitionTestSupport");
        properties.put("nextModuleCount", "3");
        properties.put("nextModuleConcurrency", "2");
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName("test.output");
        binder.bindProducer("partJ.0", directChannel, properties);
        List list = (List) TestUtils.getPropertyValue(binder, "binder.bindings", List.class);
        Assert.assertEquals(1L, list.size());
        if (usesExplicitRouting()) {
            Assert.assertThat(getEndpointRouting(((Binding) list.get(0)).getEndpoint()), Matchers.containsString("partJ.0-' + headers['partition']"));
        }
        properties.clear();
        properties.put("concurrency", "2");
        properties.put("count", "3");
        properties.put("partitionIndex", "0");
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName("test.input0J");
        binder.bindConsumer("partJ.0", queueChannel, properties);
        properties.put("partitionIndex", "1");
        QueueChannel queueChannel2 = new QueueChannel();
        queueChannel2.setBeanName("test.input1J");
        binder.bindConsumer("partJ.0", queueChannel2, properties);
        properties.put("partitionIndex", "2");
        QueueChannel queueChannel3 = new QueueChannel();
        queueChannel3.setBeanName("test.input2J");
        binder.bindConsumer("partJ.0", queueChannel3, properties);
        directChannel.send(new GenericMessage(2));
        directChannel.send(new GenericMessage(1));
        directChannel.send(new GenericMessage(0));
        Message receive = queueChannel.receive(1000L);
        Assert.assertNotNull(receive);
        Message receive2 = queueChannel2.receive(1000L);
        Assert.assertNotNull(receive2);
        Message receive3 = queueChannel3.receive(1000L);
        Assert.assertNotNull(receive3);
        if (usesExplicitRouting()) {
            Assert.assertEquals(0, receive.getPayload());
            Assert.assertEquals(1, receive2.getPayload());
            Assert.assertEquals(2, receive3.getPayload());
        } else {
            Assert.assertThat(Arrays.asList((Integer) receive.getPayload(), (Integer) receive2.getPayload(), (Integer) receive3.getPayload()), Matchers.containsInAnyOrder(new Integer[]{0, 1, 2}));
        }
        binder.unbindConsumers("partJ.0");
        binder.unbindProducers("partJ.0");
    }

    protected abstract boolean usesExplicitRouting();

    protected String getEndpointRouting(AbstractEndpoint abstractEndpoint) {
        throw new UnsupportedOperationException();
    }

    protected String getPubSubEndpointRouting(AbstractEndpoint abstractEndpoint) {
        throw new UnsupportedOperationException();
    }

    protected abstract String getClassUnderTestName();
}
