package org.springframework.cloud.stream.binder;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:org/springframework/cloud/stream/binder/BrokerBinderTests.class */
public abstract class BrokerBinderTests extends AbstractBinderTests {
    @Test
    public void testDirectBinding() throws Exception {
        Binder<MessageChannel> binder = getBinder();
        Properties properties = new Properties();
        properties.setProperty("directBindingAllowed", "true");
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName("direct.input");
        DirectChannel directChannel2 = new DirectChannel();
        directChannel2.setBeanName("direct.output");
        binder.bindConsumer("direct.0", directChannel, (Properties) null);
        binder.bindProducer("direct.0", directChannel2, properties);
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicInteger atomicInteger = new AtomicInteger();
        directChannel.subscribe(new MessageHandler() { // from class: org.springframework.cloud.stream.binder.BrokerBinderTests.1
            public void handleMessage(Message<?> message) throws MessagingException {
                atomicReference.set(Thread.currentThread());
                atomicInteger.incrementAndGet();
            }
        });
        directChannel2.send(new GenericMessage("foo"));
        directChannel2.send(new GenericMessage("foo"));
        Assert.assertNotNull(atomicReference.get());
        Assert.assertSame(Thread.currentThread(), atomicReference.get());
        Assert.assertEquals(2L, atomicInteger.get());
        Assert.assertNull(spyOn("direct.0").receive(true));
        binder.unbindConsumers("direct.0");
        binderBindUnbindLatency();
        Spy spyOn = spyOn("direct.0");
        atomicInteger.set(0);
        directChannel2.send(new GenericMessage("bar"));
        directChannel2.send(new GenericMessage("baz"));
        Assert.assertEquals("bar", spyOn.receive(false));
        Assert.assertEquals("baz", spyOn.receive(false));
        Assert.assertEquals(0L, atomicInteger.get());
        atomicReference.set(null);
        binder.bindConsumer("direct.0", directChannel, (Properties) null);
        directChannel2.send(new GenericMessage("foo"));
        directChannel2.send(new GenericMessage("foo"));
        Assert.assertNotNull(atomicReference.get());
        Assert.assertSame(Thread.currentThread(), atomicReference.get());
        Assert.assertEquals(2L, atomicInteger.get());
        Assert.assertNull(spyOn.receive(true));
        binder.unbindProducers("direct.0");
        binder.unbindConsumers("direct.0");
    }

    public abstract Spy spyOn(String str);
}
