package org.springframework.xd.dirt.integration.bus.local;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.xd.dirt.integration.bus.AbstractBusPropertiesAccessor;
import org.springframework.xd.dirt.integration.bus.Binding;
import org.springframework.xd.dirt.integration.bus.MessageBusSupport;

/* loaded from: input_file:org/springframework/xd/dirt/integration/bus/local/LocalMessageBus.class */
public class LocalMessageBus extends MessageBusSupport {
    private static final int DEFAULT_EXECUTOR_CORE_POOL_SIZE = 0;
    private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 200;
    private static final int DEFAULT_EXECUTOR_QUEUE_SIZE = Integer.MAX_VALUE;
    private static final int DEFAULT_EXECUTOR_KEEPALIVE_SECONDS = 60;
    private static final int DEFAULT_REQ_REPLY_CONCURRENCY = 1;
    protected static final Set<Object> CONSUMER_REQUEST_REPLY_PROPERTIES = new MessageBusSupport.SetBuilder().addAll(CONSUMER_STANDARD_PROPERTIES).add("concurrency").build();
    private volatile PollerMetadata poller;
    private final Map<String, ExecutorChannel> requestReplyChannels = new HashMap();
    private final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    private volatile int executorCorePoolSize = DEFAULT_EXECUTOR_CORE_POOL_SIZE;
    private volatile int executorMaxPoolSize = DEFAULT_EXECUTOR_MAX_POOL_SIZE;
    private volatile int executorQueueSize = DEFAULT_EXECUTOR_QUEUE_SIZE;
    private volatile int executorKeepAliveSeconds = DEFAULT_EXECUTOR_KEEPALIVE_SECONDS;
    private volatile int queueSize = DEFAULT_EXECUTOR_QUEUE_SIZE;
    private final Map<String, ThreadPoolTaskExecutor> reqRepExecutors = new ConcurrentHashMap();
    private final MessageBusSupport.SharedChannelProvider<QueueChannel> queueChannelProvider = new MessageBusSupport.SharedChannelProvider<QueueChannel>(QueueChannel.class) { // from class: org.springframework.xd.dirt.integration.bus.local.LocalMessageBus.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: createSharedChannel, reason: merged with bridge method [inline-methods] */
        public QueueChannel m1createSharedChannel(String str) {
            return new QueueChannel(LocalMessageBus.this.queueSize);
        }
    };
    private final MessageBusSupport.SharedChannelProvider<PublishSubscribeChannel> pubsubChannelProvider = new MessageBusSupport.SharedChannelProvider<PublishSubscribeChannel>(PublishSubscribeChannel.class) { // from class: org.springframework.xd.dirt.integration.bus.local.LocalMessageBus.2
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: createSharedChannel, reason: merged with bridge method [inline-methods] */
        public PublishSubscribeChannel m2createSharedChannel(String str) {
            PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel(LocalMessageBus.this.executor);
            publishSubscribeChannel.setIgnoreFailures(true);
            return publishSubscribeChannel;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/bus/local/LocalMessageBus$LocalBusPropertiesAccessor.class */
    public static class LocalBusPropertiesAccessor extends AbstractBusPropertiesAccessor {
        public LocalBusPropertiesAccessor(Properties properties) {
            super(properties);
        }
    }

    public void setPoller(PollerMetadata pollerMetadata) {
        this.poller = pollerMetadata;
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public void setExecutorCorePoolSize(int i) {
        this.executorCorePoolSize = i;
    }

    public void setExecutorMaxPoolSize(int i) {
        this.executorMaxPoolSize = i;
    }

    public void setExecutorQueueSize(int i) {
        this.executorQueueSize = i;
    }

    public void setExecutorKeepAliveSeconds(int i) {
        this.executorKeepAliveSeconds = i;
    }

    protected void onInit() {
        this.executor.setCorePoolSize(this.executorCorePoolSize);
        this.executor.setMaxPoolSize(this.executorMaxPoolSize);
        this.executor.setQueueCapacity(this.executorQueueSize);
        this.executor.setKeepAliveSeconds(this.executorKeepAliveSeconds);
        this.executor.setThreadNamePrefix("xd.localbus-");
        this.executor.initialize();
    }

    public MessageChannel bindDynamicProducer(String str, Properties properties) {
        return doBindDynamicProducer(str, "dynamic.output.to." + str, properties);
    }

    public MessageChannel bindDynamicPubSubProducer(String str, Properties properties) {
        return doBindDynamicPubSubProducer(str, "dynamic.output.to." + str, properties);
    }

    private MessageBusSupport.SharedChannelProvider<?> getChannelProvider(String str) {
        MessageBusSupport.SharedChannelProvider<?> sharedChannelProvider = this.directChannelProvider;
        if (str.startsWith("queue:") || str.startsWith("job:")) {
            sharedChannelProvider = this.queueChannelProvider;
        }
        return sharedChannelProvider;
    }

    public void bindConsumer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, CONSUMER_STANDARD_PROPERTIES);
        doRegisterConsumer(str, messageChannel, getChannelProvider(str), properties);
    }

    public void bindPubSubConsumer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, CONSUMER_STANDARD_PROPERTIES);
        doRegisterConsumer(str, messageChannel, this.pubsubChannelProvider, properties);
    }

    private void doRegisterConsumer(String str, MessageChannel messageChannel, MessageBusSupport.SharedChannelProvider<?> sharedChannelProvider, Properties properties) {
        Assert.hasText(str, "a valid name is required to register an inbound channel");
        Assert.notNull(messageChannel, "channel must not be null");
        NamedComponent lookupOrCreateSharedChannel = sharedChannelProvider.lookupOrCreateSharedChannel(str);
        bridge(str, lookupOrCreateSharedChannel, messageChannel, "inbound." + lookupOrCreateSharedChannel.getComponentName(), new LocalBusPropertiesAccessor(properties));
    }

    public void bindProducer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, PRODUCER_STANDARD_PROPERTIES);
        doRegisterProducer(str, messageChannel, getChannelProvider(str), properties);
    }

    public void bindPubSubProducer(String str, MessageChannel messageChannel, Properties properties) {
        validateConsumerProperties(str, properties, PRODUCER_STANDARD_PROPERTIES);
        doRegisterProducer(str, messageChannel, this.pubsubChannelProvider, properties);
    }

    private void doRegisterProducer(String str, MessageChannel messageChannel, MessageBusSupport.SharedChannelProvider<?> sharedChannelProvider, Properties properties) {
        Assert.hasText(str, "a valid name is required to register an outbound channel");
        Assert.notNull(messageChannel, "channel must not be null");
        NamedComponent lookupOrCreateSharedChannel = sharedChannelProvider.lookupOrCreateSharedChannel(str);
        bridge(str, messageChannel, lookupOrCreateSharedChannel, "outbound." + lookupOrCreateSharedChannel.getComponentName(), new LocalBusPropertiesAccessor(properties));
    }

    public void bindRequestor(String str, MessageChannel messageChannel, final MessageChannel messageChannel2, Properties properties) {
        validateConsumerProperties(str, properties, CONSUMER_REQUEST_REPLY_PROPERTIES);
        final ExecutorChannel findOrCreateRequestReplyChannel = findOrCreateRequestReplyChannel(str, "requestor.", properties);
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        ((SubscribableChannel) messageChannel).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.local.LocalMessageBus.3
            public void handleMessage(Message<?> message) throws MessagingException {
                findOrCreateRequestReplyChannel.send(message);
            }
        });
        findOrCreateRequestReplyChannel(str, "replier.", properties).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.local.LocalMessageBus.4
            public void handleMessage(Message<?> message) throws MessagingException {
                messageChannel2.send(message);
            }
        });
    }

    public void bindReplier(String str, final MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        validateConsumerProperties(str, properties, CONSUMER_REQUEST_REPLY_PROPERTIES);
        findOrCreateRequestReplyChannel(str, "requestor.", properties).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.local.LocalMessageBus.5
            public void handleMessage(Message<?> message) throws MessagingException {
                messageChannel.send(message);
            }
        });
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel2);
        final ExecutorChannel findOrCreateRequestReplyChannel = findOrCreateRequestReplyChannel(str, "replier.", properties);
        ((SubscribableChannel) messageChannel2).subscribe(new MessageHandler() { // from class: org.springframework.xd.dirt.integration.bus.local.LocalMessageBus.6
            public void handleMessage(Message<?> message) throws MessagingException {
                findOrCreateRequestReplyChannel.send(message);
            }
        });
    }

    private synchronized ExecutorChannel findOrCreateRequestReplyChannel(String str, String str2, Properties properties) {
        String str3 = str2 + str;
        ExecutorChannel executorChannel = this.requestReplyChannels.get(str3);
        if (executorChannel == null) {
            ThreadPoolTaskExecutor createRequestReplyExecutor = createRequestReplyExecutor(str, properties);
            executorChannel = new ExecutorChannel(createRequestReplyExecutor);
            executorChannel.setBeanFactory(getBeanFactory());
            this.requestReplyChannels.put(str3, executorChannel);
            this.reqRepExecutors.put(str, createRequestReplyExecutor);
        }
        return executorChannel;
    }

    private ThreadPoolTaskExecutor createRequestReplyExecutor(String str, Properties properties) {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(new LocalBusPropertiesAccessor(properties).getConcurrency(DEFAULT_REQ_REPLY_CONCURRENCY));
        threadPoolTaskExecutor.setThreadNamePrefix("xd.localBus." + str + "-");
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    public void unbindProducer(String str, MessageChannel messageChannel) {
        this.requestReplyChannels.remove("replier." + str);
        if (this.requestReplyChannels.remove("requestor." + str) == null) {
            super.unbindProducer(str, messageChannel);
        }
        ThreadPoolTaskExecutor remove = this.reqRepExecutors.remove(str);
        if (remove != null) {
            remove.shutdown();
        }
    }

    protected BridgeHandler bridge(String str, MessageChannel messageChannel, MessageChannel messageChannel2, String str2, LocalBusPropertiesAccessor localBusPropertiesAccessor) {
        return bridge(str, messageChannel, messageChannel2, str2, null, localBusPropertiesAccessor);
    }

    protected BridgeHandler bridge(String str, MessageChannel messageChannel, MessageChannel messageChannel2, String str2, Collection<MimeType> collection, LocalBusPropertiesAccessor localBusPropertiesAccessor) {
        boolean startsWith = str2.startsWith("inbound.");
        BridgeHandler bridgeHandler = new BridgeHandler() { // from class: org.springframework.xd.dirt.integration.bus.local.LocalMessageBus.7
            protected boolean shouldCopyRequestHeaders() {
                return false;
            }

            protected Object handleRequestMessage(Message<?> message) {
                return message;
            }
        };
        bridgeHandler.setBeanFactory(getBeanFactory());
        bridgeHandler.setOutputChannel(messageChannel2);
        bridgeHandler.setBeanName(str2);
        bridgeHandler.afterPropertiesSet();
        ConsumerEndpointFactoryBean consumerEndpointFactoryBean = new ConsumerEndpointFactoryBean();
        consumerEndpointFactoryBean.setBeanName(str2);
        consumerEndpointFactoryBean.setInputChannel(messageChannel);
        consumerEndpointFactoryBean.setHandler(bridgeHandler);
        consumerEndpointFactoryBean.setBeanFactory(getBeanFactory());
        if (messageChannel instanceof PollableChannel) {
            consumerEndpointFactoryBean.setPollerMetadata(this.poller);
        }
        try {
            consumerEndpointFactoryBean.afterPropertiesSet();
            try {
                consumerEndpointFactoryBean.getObject().setComponentName(bridgeHandler.getComponentName());
                Binding forConsumer = startsWith ? Binding.forConsumer(str, consumerEndpointFactoryBean.getObject(), messageChannel2, localBusPropertiesAccessor) : Binding.forProducer(str, messageChannel, consumerEndpointFactoryBean.getObject(), localBusPropertiesAccessor);
                addBinding(forConsumer);
                forConsumer.start();
                return bridgeHandler;
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        } catch (Exception e2) {
            throw new IllegalStateException(e2);
        }
    }

    protected <T> T getBean(String str, Class<T> cls) {
        return (T) getApplicationContext().getBean(str, cls);
    }
}
