package org.springframework.cloud.stream.binding;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.aggregate.SharedChannelRegistry;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.binder.MessageChannelBinderSupport;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binding/BindableProxyFactory.class */
public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, BeanFactoryAware, Bindable, InitializingBean {
    private static Log log = LogFactory.getLog(BindableProxyFactory.class);
    public static final String SPRING_CLOUD_STREAM_INTERNAL_PREFIX = "spring.cloud.stream.internal";
    public static final String CHANNEL_NAMESPACE_PROPERTY_NAME = "spring.cloud.stream.internal.channelNamespace";
    public static final String POLLABLE_BRIDGE_INTERVAL_PROPERTY_NAME = "spring.cloud.stream.internal.pollableBridge.interval";
    private Class<?> type;

    @Value("${spring.cloud.stream.internal.channelNamespace:}")
    private String channelNamespace;

    @Value("${spring.cloud.stream.internal.pollableBridge.interval:1000}")
    private int pollableBridgeDefaultFrequency;
    private Object proxy = null;
    private Map<String, ChannelHolder> inputs = new HashMap();
    private Map<String, ChannelHolder> outputs = new HashMap();
    private ConfigurableListableBeanFactory beanFactory;

    @Autowired(required = false)
    private SharedChannelRegistry sharedChannelRegistry;

    /* loaded from: input_file:org/springframework/cloud/stream/binding/BindableProxyFactory$ChannelHolder.class */
    static class ChannelHolder {
        private MessageChannel messageChannel;
        private boolean bindable;

        public ChannelHolder(MessageChannel messageChannel, boolean z) {
            this.messageChannel = messageChannel;
            this.bindable = z;
        }

        public MessageChannel getMessageChannel() {
            return this.messageChannel;
        }

        public boolean isBindable() {
            return this.bindable;
        }
    }

    public BindableProxyFactory(Class<?> cls) {
        this.type = cls;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.beanFactory, "Bean factory cannot be empty");
    }

    @Override // org.springframework.cloud.stream.binding.Bindable
    public Set<String> getInputs() {
        return this.inputs.keySet();
    }

    @Override // org.springframework.cloud.stream.binding.Bindable
    public Set<String> getOutputs() {
        return this.outputs.keySet();
    }

    private void createChannels(Class<?> cls) throws Exception {
        ReflectionUtils.doWithMethods(cls, new ReflectionUtils.MethodCallback() { // from class: org.springframework.cloud.stream.binding.BindableProxyFactory.1
            public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
                Input input = (Input) AnnotationUtils.findAnnotation(method, Input.class);
                if (input != null) {
                    String channelName = BindingBeanDefinitionRegistryUtils.getChannelName(input, method);
                    Class<?> returnType = method.getReturnType();
                    SubscribableChannel locateSharedChannel = BindableProxyFactory.this.locateSharedChannel(channelName);
                    if (locateSharedChannel == null) {
                        BindableProxyFactory.this.inputs.put(channelName, new ChannelHolder(BindableProxyFactory.this.createMessageChannel(returnType), true));
                    } else if (returnType.isAssignableFrom(locateSharedChannel.getClass())) {
                        BindableProxyFactory.this.inputs.put(channelName, new ChannelHolder(locateSharedChannel, false));
                    } else {
                        MessageChannel createMessageChannel = BindableProxyFactory.this.createMessageChannel(returnType);
                        if (BindableProxyFactory.this.isPollable(locateSharedChannel.getClass())) {
                            BindableProxyFactory.this.bridgePollableToSubscribableChannel(locateSharedChannel, createMessageChannel);
                        } else {
                            BindableProxyFactory.this.bridgeSubscribableToPollableChannel(locateSharedChannel, createMessageChannel);
                        }
                        BindableProxyFactory.this.inputs.put(channelName, new ChannelHolder(createMessageChannel, false));
                    }
                }
                Output output = (Output) AnnotationUtils.findAnnotation(method, Output.class);
                if (output != null) {
                    String channelName2 = BindingBeanDefinitionRegistryUtils.getChannelName(output, method);
                    Class<?> returnType2 = method.getReturnType();
                    MessageChannel locateSharedChannel2 = BindableProxyFactory.this.locateSharedChannel(channelName2);
                    if (locateSharedChannel2 == null) {
                        BindableProxyFactory.this.outputs.put(channelName2, new ChannelHolder(BindableProxyFactory.this.createMessageChannel(returnType2), true));
                    } else {
                        if (returnType2.isAssignableFrom(locateSharedChannel2.getClass())) {
                            BindableProxyFactory.this.outputs.put(channelName2, new ChannelHolder(locateSharedChannel2, false));
                            return;
                        }
                        SubscribableChannel createMessageChannel2 = BindableProxyFactory.this.createMessageChannel(returnType2);
                        if (BindableProxyFactory.this.isPollable(returnType2)) {
                            BindableProxyFactory.this.bridgePollableToSubscribableChannel(createMessageChannel2, locateSharedChannel2);
                        } else {
                            BindableProxyFactory.this.bridgeSubscribableToPollableChannel(createMessageChannel2, locateSharedChannel2);
                        }
                        BindableProxyFactory.this.outputs.put(channelName2, new ChannelHolder(createMessageChannel2, false));
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageChannel locateSharedChannel(String str) {
        if (this.sharedChannelRegistry != null) {
            return this.sharedChannelRegistry.get(getNamespacePrefixedChannelName(str));
        }
        return null;
    }

    private String getNamespacePrefixedChannelName(String str) {
        return this.channelNamespace + "." + str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void bridgeSubscribableToPollableChannel(SubscribableChannel subscribableChannel, MessageChannel messageChannel) {
        subscribableChannel.subscribe(new MessageChannelBinderSupport.DirectHandler(messageChannel));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void bridgePollableToSubscribableChannel(MessageChannel messageChannel, MessageChannel messageChannel2) {
        ConsumerEndpointFactoryBean consumerEndpointFactoryBean = new ConsumerEndpointFactoryBean();
        consumerEndpointFactoryBean.setInputChannel(messageChannel);
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(this.pollableBridgeDefaultFrequency));
        consumerEndpointFactoryBean.setPollerMetadata(pollerMetadata);
        consumerEndpointFactoryBean.setHandler(new MessageChannelBinderSupport.DirectHandler(messageChannel2));
        consumerEndpointFactoryBean.setBeanFactory(this.beanFactory);
        try {
            consumerEndpointFactoryBean.afterPropertiesSet();
            consumerEndpointFactoryBean.start();
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageChannel createMessageChannel(Class<?> cls) {
        return isPollable(cls) ? new QueueChannel() : new DirectChannel();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isPollable(Class<?> cls) {
        return PollableChannel.class.equals(cls);
    }

    public synchronized Object invoke(MethodInvocation methodInvocation) throws Throwable {
        Method method = methodInvocation.getMethod();
        if (!MessageChannel.class.isAssignableFrom(method.getReturnType())) {
            return null;
        }
        Input input = (Input) AnnotationUtils.findAnnotation(method, Input.class);
        if (input != null) {
            return this.inputs.get(BindingBeanDefinitionRegistryUtils.getChannelName(input, method)).getMessageChannel();
        }
        Output output = (Output) AnnotationUtils.findAnnotation(method, Output.class);
        if (output == null) {
            return null;
        }
        return this.outputs.get(BindingBeanDefinitionRegistryUtils.getChannelName(output, method)).getMessageChannel();
    }

    public synchronized Object getObject() throws Exception {
        if (this.proxy == null) {
            createChannels(this.type);
            this.proxy = new ProxyFactory(this.type, this).getProxy();
        }
        return this.proxy;
    }

    public Class<?> getObjectType() {
        return this.type;
    }

    public boolean isSingleton() {
        return true;
    }

    @Override // org.springframework.cloud.stream.binding.Bindable
    public void bindInputs(ChannelBindingService channelBindingService) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Binding inputs for %s:%s", this.channelNamespace, this.type));
        }
        for (Map.Entry<String, ChannelHolder> entry : this.inputs.entrySet()) {
            String key = entry.getKey();
            ChannelHolder value = entry.getValue();
            channelBindingService.configureMessageConverters(value.getMessageChannel(), key);
            if (value.isBindable()) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, key));
                }
                channelBindingService.bindConsumer(value.getMessageChannel(), key);
            }
        }
    }

    @Override // org.springframework.cloud.stream.binding.Bindable
    public void bindOutputs(ChannelBindingService channelBindingService) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Binding outputs for %s:%s", this.channelNamespace, this.type));
        }
        for (Map.Entry<String, ChannelHolder> entry : this.outputs.entrySet()) {
            ChannelHolder value = entry.getValue();
            String key = entry.getKey();
            channelBindingService.configureMessageConverters(value.getMessageChannel(), key);
            if (entry.getValue().isBindable()) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, key));
                }
                channelBindingService.bindProducer(value.getMessageChannel(), key);
            }
        }
    }

    @Override // org.springframework.cloud.stream.binding.Bindable
    public void unbindInputs(ChannelBindingService channelBindingService) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Unbinding inputs for %s:%s", this.channelNamespace, this.type));
        }
        for (Map.Entry<String, ChannelHolder> entry : this.inputs.entrySet()) {
            if (entry.getValue().isBindable()) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Unbinding %s:%s:%s", this.channelNamespace, this.type, entry.getKey()));
                }
                channelBindingService.unbindConsumers(entry.getKey());
            }
        }
    }

    @Override // org.springframework.cloud.stream.binding.Bindable
    public void unbindOutputs(ChannelBindingService channelBindingService) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Unbinding outputs for %s:%s", this.channelNamespace, this.type));
        }
        for (Map.Entry<String, ChannelHolder> entry : this.outputs.entrySet()) {
            if (entry.getValue().isBindable()) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Binding %s:%s:%s", this.channelNamespace, this.type, entry.getKey()));
                }
                channelBindingService.unbindProducers(entry.getKey());
            }
        }
    }
}
