package org.springframework.xd.dirt.integration.bus;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.integration.bus.MessageBusSupport;

/* loaded from: input_file:org/springframework/xd/dirt/integration/bus/LocalMessageBus.class */
public class LocalMessageBus extends MessageBusSupport {
    private PollerMetadata poller;
    private final Map<String, ExecutorChannel> requestReplyChannels = new HashMap();
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private boolean hasCodec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/bus/LocalMessageBus$LocalBusPropertiesAccessor.class */
    public class LocalBusPropertiesAccessor extends AbstractBusPropertiesAccessor {
        public LocalBusPropertiesAccessor(Properties properties) {
            super(properties);
        }
    }

    public void setPoller(PollerMetadata pollerMetadata) {
        this.poller = pollerMetadata;
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBusSupport, org.springframework.xd.dirt.integration.bus.MessageBus
    public MessageChannel bindDynamicProducer(String str, Properties properties) {
        return doBindDynamicProducer(str, "dynamic.output.to." + str, properties);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBusSupport, org.springframework.xd.dirt.integration.bus.MessageBus
    public MessageChannel bindDynamicPubSubProducer(String str, Properties properties) {
        return doBindDynamicPubSubProducer(str, "dynamic.output.to." + str, properties);
    }

    private MessageBusSupport.SharedChannelProvider<?> getChannelProvider(String str) {
        MessageBusSupport.SharedChannelProvider<?> sharedChannelProvider = this.directChannelProvider;
        if (str.startsWith("queue:") || str.startsWith("job:")) {
            sharedChannelProvider = this.queueChannelProvider;
        }
        return sharedChannelProvider;
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindConsumer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, Collections.emptySet());
        doRegisterConsumer(str, messageChannel, getChannelProvider(str), properties);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindPubSubConsumer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, Collections.emptySet());
        doRegisterConsumer(str, messageChannel, this.pubsubChannelProvider, properties);
    }

    private void doRegisterConsumer(String str, MessageChannel messageChannel, MessageBusSupport.SharedChannelProvider<?> sharedChannelProvider, Properties properties) {
        Assert.hasText(str, "a valid name is required to register an inbound channel");
        Assert.notNull(messageChannel, "channel must not be null");
        MessageChannel lookupOrCreateSharedChannel = sharedChannelProvider.lookupOrCreateSharedChannel(str);
        bridge(str, lookupOrCreateSharedChannel, messageChannel, "inbound." + ((NamedComponent) lookupOrCreateSharedChannel).getComponentName(), new LocalBusPropertiesAccessor(properties));
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindProducer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, Collections.emptySet());
        doRegisterProducer(str, messageChannel, getChannelProvider(str), properties);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindPubSubProducer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, Collections.emptySet());
        doRegisterProducer(str, messageChannel, this.pubsubChannelProvider, properties);
    }

    private void doRegisterProducer(String str, MessageChannel messageChannel, MessageBusSupport.SharedChannelProvider<?> sharedChannelProvider, Properties properties) {
        Assert.hasText(str, "a valid name is required to register an outbound channel");
        Assert.notNull(messageChannel, "channel must not be null");
        MessageChannel lookupOrCreateSharedChannel = sharedChannelProvider.lookupOrCreateSharedChannel(str);
        bridge(str, messageChannel, lookupOrCreateSharedChannel, "outbound." + ((NamedComponent) lookupOrCreateSharedChannel).getComponentName(), new LocalBusPropertiesAccessor(properties));
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindRequestor(String str, MessageChannel messageChannel, final MessageChannel messageChannel2, Properties properties) {
        validateConsumerProperties(str, properties, Collections.emptySet());
        final ExecutorChannel findOrCreateRequestReplyChannel = findOrCreateRequestReplyChannel("requestor." + str);
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        ((SubscribableChannel) messageChannel).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.LocalMessageBus.1
            public void handleMessage(Message<?> message) throws MessagingException {
                findOrCreateRequestReplyChannel.send(message);
            }
        });
        findOrCreateRequestReplyChannel("replier." + str).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.LocalMessageBus.2
            public void handleMessage(Message<?> message) throws MessagingException {
                messageChannel2.send(message);
            }
        });
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindReplier(String str, final MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        validateConsumerProperties(str, properties, Collections.emptySet());
        findOrCreateRequestReplyChannel("requestor." + str).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.LocalMessageBus.3
            public void handleMessage(Message<?> message) throws MessagingException {
                messageChannel.send(message);
            }
        });
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel2);
        final ExecutorChannel findOrCreateRequestReplyChannel = findOrCreateRequestReplyChannel("replier." + str);
        ((SubscribableChannel) messageChannel2).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.LocalMessageBus.4
            public void handleMessage(Message<?> message) throws MessagingException {
                findOrCreateRequestReplyChannel.send(message);
            }
        });
    }

    private synchronized ExecutorChannel findOrCreateRequestReplyChannel(String str) {
        ExecutorChannel executorChannel = this.requestReplyChannels.get(str);
        if (executorChannel == null) {
            executorChannel = new ExecutorChannel(this.executor);
            executorChannel.setBeanFactory(getBeanFactory());
            this.requestReplyChannels.put(str, executorChannel);
        }
        return executorChannel;
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBusSupport, org.springframework.xd.dirt.integration.bus.MessageBus
    public void unbindProducer(String str, MessageChannel messageChannel) {
        this.requestReplyChannels.remove("replier." + str);
        if (this.requestReplyChannels.remove("requestor." + str) == null) {
            super.unbindProducer(str, messageChannel);
        }
    }

    protected BridgeHandler bridge(String str, MessageChannel messageChannel, MessageChannel messageChannel2, String str2, LocalBusPropertiesAccessor localBusPropertiesAccessor) {
        return bridge(str, messageChannel, messageChannel2, str2, null, localBusPropertiesAccessor);
    }

    protected BridgeHandler bridge(String str, MessageChannel messageChannel, MessageChannel messageChannel2, String str2, Collection<MediaType> collection, LocalBusPropertiesAccessor localBusPropertiesAccessor) {
        boolean startsWith = str2.startsWith("inbound.");
        BridgeHandler bridgeHandler = new BridgeHandler() { // from class: org.springframework.xd.dirt.integration.bus.LocalMessageBus.5
            protected Object handleRequestMessage(Message<?> message) {
                return message;
            }
        };
        bridgeHandler.setBeanFactory(getBeanFactory());
        bridgeHandler.setOutputChannel(messageChannel2);
        bridgeHandler.setBeanName(str2);
        bridgeHandler.afterPropertiesSet();
        ConsumerEndpointFactoryBean consumerEndpointFactoryBean = new ConsumerEndpointFactoryBean();
        consumerEndpointFactoryBean.setInputChannel(messageChannel);
        consumerEndpointFactoryBean.setHandler(bridgeHandler);
        consumerEndpointFactoryBean.setBeanFactory(getBeanFactory());
        if (messageChannel instanceof PollableChannel) {
            consumerEndpointFactoryBean.setPollerMetadata(this.poller);
        }
        try {
            consumerEndpointFactoryBean.afterPropertiesSet();
            try {
                consumerEndpointFactoryBean.getObject().setComponentName(bridgeHandler.getComponentName());
                Binding forConsumer = startsWith ? Binding.forConsumer(str, consumerEndpointFactoryBean.getObject(), messageChannel2, localBusPropertiesAccessor) : Binding.forProducer(str, messageChannel, consumerEndpointFactoryBean.getObject(), localBusPropertiesAccessor);
                addBinding(forConsumer);
                forConsumer.start();
                return bridgeHandler;
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        } catch (Exception e2) {
            throw new IllegalStateException(e2);
        }
    }

    protected <T> T getBean(String str, Class<T> cls) {
        return (T) getApplicationContext().getBean(str, cls);
    }
}
