package org.springframework.xd.dirt.integration.bus;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.expression.IntegrationEvaluationContextAware;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.converter.ContentTypeResolver;
import org.springframework.util.AlternativeJdkIdGenerator;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.IdGenerator;
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec;
import org.springframework.xd.dirt.integration.bus.serializer.SerializationException;

/* loaded from: input_file:org/springframework/xd/dirt/integration/bus/MessageBusSupport.class */
public abstract class MessageBusSupport implements MessageBus, ApplicationContextAware, InitializingBean, IntegrationEvaluationContextAware {
    protected static final String P2P_NAMED_CHANNEL_TYPE_PREFIX = "queue:";
    protected static final String PUBSUB_NAMED_CHANNEL_TYPE_PREFIX = "topic:";
    protected static final String JOB_CHANNEL_TYPE_PREFIX = "job:";
    private volatile AbstractApplicationContext applicationContext;
    private volatile MultiTypeCodec<Object> codec;
    protected static final String ORIGINAL_CONTENT_TYPE_HEADER = "originalContentType";
    protected static final List<MediaType> MEDIATYPES_MEDIATYPE_ALL = Collections.singletonList(MediaType.ALL);
    private volatile EvaluationContext evaluationContext;
    protected final Log logger = LogFactory.getLog(getClass());
    private int queueSize = Integer.MAX_VALUE;
    private final ContentTypeResolver contentTypeResolver = new StringConvertingContentTypeResolver();
    private final List<Binding> bindings = Collections.synchronizedList(new ArrayList());
    private final IdGenerator idGenerator = new AlternativeJdkIdGenerator();
    private final Set<MessageChannel> createdChannels = Collections.synchronizedSet(new HashSet());
    private volatile PartitionSelectorStrategy partitionSelector = new DefaultPartitionSelector();
    protected final SharedChannelProvider<DirectChannel> directChannelProvider = new SharedChannelProvider<DirectChannel>(DirectChannel.class) { // from class: org.springframework.xd.dirt.integration.bus.MessageBusSupport.1
        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.xd.dirt.integration.bus.MessageBusSupport.SharedChannelProvider
        public DirectChannel createSharedChannel(String str) {
            return new DirectChannel();
        }
    };
    protected final SharedChannelProvider<QueueChannel> queueChannelProvider = new SharedChannelProvider<QueueChannel>(QueueChannel.class) { // from class: org.springframework.xd.dirt.integration.bus.MessageBusSupport.2
        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.xd.dirt.integration.bus.MessageBusSupport.SharedChannelProvider
        public QueueChannel createSharedChannel(String str) {
            return new QueueChannel(MessageBusSupport.this.queueSize);
        }
    };
    protected final SharedChannelProvider<PublishSubscribeChannel> pubsubChannelProvider = new SharedChannelProvider<PublishSubscribeChannel>(PublishSubscribeChannel.class) { // from class: org.springframework.xd.dirt.integration.bus.MessageBusSupport.3
        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.xd.dirt.integration.bus.MessageBusSupport.SharedChannelProvider
        public PublishSubscribeChannel createSharedChannel(String str) {
            return new PublishSubscribeChannel();
        }
    };

    /* loaded from: input_file:org/springframework/xd/dirt/integration/bus/MessageBusSupport$DefaultPartitionSelector.class */
    private class DefaultPartitionSelector implements PartitionSelectorStrategy {
        private DefaultPartitionSelector() {
        }

        @Override // org.springframework.xd.dirt.integration.bus.PartitionSelectorStrategy
        public int selectPartition(Object obj, int i) {
            return Math.abs(obj.hashCode()) % i;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/bus/MessageBusSupport$PartitioningMetadata.class */
    public class PartitioningMetadata {
        private final String partitionKeyExtractorClass;
        private final Expression partitionKeyExpression;
        private final String partitionSelectorClass;
        private final Expression partitionSelectorExpression;
        private final int partitionCount;

        public PartitioningMetadata(AbstractBusPropertiesAccessor abstractBusPropertiesAccessor) {
            this.partitionKeyExtractorClass = abstractBusPropertiesAccessor.getPartitionKeyExtractorClass();
            this.partitionKeyExpression = abstractBusPropertiesAccessor.getPartitionKeyExpression();
            this.partitionSelectorClass = abstractBusPropertiesAccessor.getPartitionSelectorClass();
            this.partitionSelectorExpression = abstractBusPropertiesAccessor.getPartitionSelectorExpression();
            this.partitionCount = abstractBusPropertiesAccessor.getPartitionCount();
        }

        public boolean isPartitionedModule() {
            return StringUtils.hasText(this.partitionKeyExtractorClass) || this.partitionKeyExpression != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/bus/MessageBusSupport$SharedChannelProvider.class */
    public abstract class SharedChannelProvider<T extends MessageChannel> {
        private final Class<T> requiredType;

        private SharedChannelProvider(Class<T> cls) {
            this.requiredType = cls;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final synchronized T lookupOrCreateSharedChannel(String str) {
            T lookupSharedChannel = lookupSharedChannel(str);
            if (lookupSharedChannel == null) {
                lookupSharedChannel = createAndRegisterChannel(str);
            }
            return lookupSharedChannel;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public T createAndRegisterChannel(String str) {
            T createSharedChannel = createSharedChannel(str);
            ConfigurableListableBeanFactory beanFactory = MessageBusSupport.this.applicationContext.getBeanFactory();
            beanFactory.registerSingleton(str, createSharedChannel);
            T t = (T) beanFactory.initializeBean(createSharedChannel, str);
            MessageBusSupport.this.createdChannels.add(t);
            if (MessageBusSupport.this.logger.isDebugEnabled()) {
                MessageBusSupport.this.logger.debug("Registered channel:" + str);
            }
            return t;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract T createSharedChannel(String str);

        protected T lookupSharedChannel(String str) {
            MessageChannel messageChannel = null;
            if (MessageBusSupport.this.applicationContext.containsBean(str)) {
                try {
                    messageChannel = (MessageChannel) MessageBusSupport.this.applicationContext.getBean(str, this.requiredType);
                } catch (Exception e) {
                    throw new IllegalArgumentException("bean '" + str + "' is already registered but does not match the required type");
                }
            }
            return (T) messageChannel;
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Assert.isInstanceOf(AbstractApplicationContext.class, applicationContext);
        this.applicationContext = (AbstractApplicationContext) applicationContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractApplicationContext getApplicationContext() {
        return this.applicationContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConfigurableListableBeanFactory getBeanFactory() {
        return this.applicationContext.getBeanFactory();
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public void setCodec(MultiTypeCodec<Object> multiTypeCodec) {
        this.codec = multiTypeCodec;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IdGenerator getIdGenerator() {
        return this.idGenerator;
    }

    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    public void setPartitionSelector(PartitionSelectorStrategy partitionSelectorStrategy) {
        this.partitionSelector = partitionSelectorStrategy;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.applicationContext, "The 'applicationContext' property cannot be null");
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public synchronized MessageChannel bindDynamicProducer(String str) {
        MessageChannel lookupSharedChannel = this.directChannelProvider.lookupSharedChannel(str);
        if (lookupSharedChannel == null) {
            lookupSharedChannel = this.directChannelProvider.createAndRegisterChannel(str);
            bindProducer(str, lookupSharedChannel, null);
        }
        return lookupSharedChannel;
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public synchronized MessageChannel bindDynamicPubSubProducer(String str) {
        MessageChannel lookupSharedChannel = this.directChannelProvider.lookupSharedChannel(str);
        if (lookupSharedChannel == null) {
            lookupSharedChannel = this.directChannelProvider.createAndRegisterChannel(str);
            bindPubSubProducer(str, lookupSharedChannel, null);
        }
        return lookupSharedChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void registerNamedChannelForConsumerIfNecessary(String str, boolean z) {
        if (isNamedChannel(str)) {
            if (z) {
                bindDynamicPubSubProducer(str);
            } else {
                bindDynamicProducer(str);
            }
        }
    }

    protected boolean isNamedChannel(String str) {
        return str.startsWith(P2P_NAMED_CHANNEL_TYPE_PREFIX) || str.startsWith(PUBSUB_NAMED_CHANNEL_TYPE_PREFIX) || str.startsWith(JOB_CHANNEL_TYPE_PREFIX);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void unbindConsumers(String str) {
        deleteBindings("inbound." + str);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void unbindProducers(String str) {
        deleteBindings("outbound." + str);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void unbindConsumer(String str, MessageChannel messageChannel) {
        deleteBinding("inbound." + str, messageChannel);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void unbindProducer(String str, MessageChannel messageChannel) {
        deleteBinding("outbound." + str, messageChannel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addBinding(Binding binding) {
        this.bindings.add(binding);
    }

    protected void deleteBindings(String str) {
        Assert.hasText(str, "a valid name is required to remove bindings");
        synchronized (this.bindings) {
            Iterator<Binding> it = this.bindings.iterator();
            while (it.hasNext()) {
                Binding next = it.next();
                if (next.getEndpoint().getComponentName().equals(str)) {
                    next.stop();
                    it.remove();
                    destroyCreatedChannel(next);
                }
            }
        }
    }

    protected void deleteBinding(String str, MessageChannel messageChannel) {
        Assert.hasText(str, "a valid name is required to remove a binding");
        Assert.notNull(messageChannel, "a valid channel is required to remove a binding");
        synchronized (this.bindings) {
            Iterator<Binding> it = this.bindings.iterator();
            while (it.hasNext()) {
                Binding next = it.next();
                if (next.getChannel().equals(messageChannel) && next.getEndpoint().getComponentName().equals(str)) {
                    next.stop();
                    it.remove();
                    destroyCreatedChannel(next);
                    return;
                }
            }
        }
    }

    protected void destroyCreatedChannel(Binding binding) {
        NamedComponent channel = binding.getChannel();
        if (Binding.PRODUCER.equals(binding.getType()) && this.createdChannels.contains(channel)) {
            this.createdChannels.remove(channel);
            DefaultListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
            if (beanFactory instanceof DefaultListableBeanFactory) {
                String componentName = channel.getComponentName();
                beanFactory.destroySingleton(componentName);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Removed channel:" + componentName);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopBindings() {
        Iterator<Binding> it = this.bindings.iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (Exception e) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn("failed to stop adapter", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Message<?> serializePayloadIfNecessary(Message<?> message, MediaType mediaType) {
        Object payload = message.getPayload();
        Object obj = message.getHeaders().get("contentType");
        if (mediaType.equals(MediaType.ALL)) {
            return message;
        }
        if (!mediaType.equals(MediaType.APPLICATION_OCTET_STREAM)) {
            throw new IllegalArgumentException("'to' can only be 'ALL' or 'APPLICATION_OCTET_STREAM'");
        }
        MessageBuilder header = MessageBuilder.withPayload(serializePayloadIfNecessary(payload)).copyHeaders(message.getHeaders()).setHeader("contentType", resolveContentType(payload));
        if (obj != null) {
            header.setHeader(ORIGINAL_CONTENT_TYPE_HEADER, obj);
        }
        return header.build();
    }

    private byte[] serializePayloadIfNecessary(Object obj) {
        if (obj instanceof byte[]) {
            return (byte[]) obj;
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            if (obj instanceof String) {
                return ((String) obj).getBytes("UTF-8");
            }
            this.codec.serialize(obj, byteArrayOutputStream);
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new SerializationException("unable to serialize payload [" + obj.getClass().getName() + "]", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Message<?> deserializePayloadIfNecessary(Message<?> message) {
        Message<?> message2 = message;
        Object deserializePayload = deserializePayload(message.getPayload(), this.contentTypeResolver.resolve(message.getHeaders()));
        if (deserializePayload != null) {
            MessageBuilder copyHeaders = MessageBuilder.withPayload(deserializePayload).copyHeaders(message.getHeaders());
            copyHeaders.setHeader("contentType", message.getHeaders().get(ORIGINAL_CONTENT_TYPE_HEADER));
            copyHeaders.setHeader(ORIGINAL_CONTENT_TYPE_HEADER, (Object) null);
            message2 = copyHeaders.build();
        }
        return message2;
    }

    private Object deserializePayload(Object obj, MimeType mimeType) {
        if ((obj instanceof byte[]) && !MediaType.APPLICATION_OCTET_STREAM.equals(mimeType)) {
            return deserializePayload((byte[]) obj, mimeType);
        }
        return obj;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Object deserializePayload(byte[] bArr, MimeType mimeType) {
        try {
            if (mimeType.equals(MediaType.TEXT_PLAIN)) {
                return new String(bArr, "UTF-8");
            }
            return this.codec.deserialize(bArr, (Class<? extends Object>) Class.forName(mimeType.getParameter("type")));
        } catch (IOException e) {
            throw new SerializationException("unable to deserialize [" + ((Object) null) + "]", e);
        } catch (ClassNotFoundException e2) {
            throw new SerializationException("unable to deserialize [" + ((Object) null) + "]. Class not found.", e2);
        }
    }

    private String resolveContentType(Object obj) {
        return obj instanceof byte[] ? "application/octet-stream" : obj instanceof String ? "text/plain" : "application/x-java-object;type=" + obj.getClass().getName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int determinePartition(Message<?> message, PartitioningMetadata partitioningMetadata) {
        Object obj = null;
        if (StringUtils.hasText(partitioningMetadata.partitionKeyExtractorClass)) {
            obj = invokeExtractor(partitioningMetadata.partitionKeyExtractorClass, message);
        } else if (partitioningMetadata.partitionKeyExpression != null) {
            obj = partitioningMetadata.partitionKeyExpression.getValue(this.evaluationContext, message);
        }
        Assert.notNull(obj, "Partition key cannot be null");
        int invokePartitionSelector = StringUtils.hasText(partitioningMetadata.partitionSelectorClass) ? invokePartitionSelector(partitioningMetadata.partitionSelectorClass, obj, partitioningMetadata.partitionCount) : partitioningMetadata.partitionSelectorExpression != null ? ((Integer) partitioningMetadata.partitionSelectorExpression.getValue(this.evaluationContext, obj, Integer.class)).intValue() : this.partitionSelector.selectPartition(obj, partitioningMetadata.partitionCount);
        if (invokePartitionSelector >= partitioningMetadata.partitionCount) {
            invokePartitionSelector %= partitioningMetadata.partitionCount;
        }
        if (invokePartitionSelector < 0) {
            invokePartitionSelector = Math.abs(invokePartitionSelector);
        }
        return invokePartitionSelector;
    }

    private Object invokeExtractor(String str, Message<?> message) {
        if (this.applicationContext.containsBean(str)) {
            return ((PartitionKeyExtractorStrategy) this.applicationContext.getBean(str, PartitionKeyExtractorStrategy.class)).extractKey(message);
        }
        try {
            try {
                Object newInstance = ClassUtils.forName(str, this.applicationContext.getClassLoader()).newInstance();
                Assert.isInstanceOf(PartitionKeyExtractorStrategy.class, newInstance);
                this.applicationContext.getBeanFactory().registerSingleton(str, newInstance);
                this.applicationContext.getBeanFactory().initializeBean(newInstance, str);
                return ((PartitionKeyExtractorStrategy) newInstance).extractKey(message);
            } catch (Exception e) {
                this.logger.error("Failed to instantiate key extractor", e);
                throw new MessageBusException("Failed to instantiate key extractor: " + str, e);
            }
        } catch (Exception e2) {
            this.logger.error("Failed to load key extractor", e2);
            throw new MessageBusException("Failed to load key extractor: " + str, e2);
        }
    }

    private int invokePartitionSelector(String str, Object obj, int i) {
        if (this.applicationContext.containsBean(str)) {
            return ((PartitionSelectorStrategy) this.applicationContext.getBean(str, PartitionSelectorStrategy.class)).selectPartition(obj, i);
        }
        try {
            try {
                Object newInstance = ClassUtils.forName(str, this.applicationContext.getClassLoader()).newInstance();
                Assert.isInstanceOf(PartitionKeyExtractorStrategy.class, newInstance);
                this.applicationContext.getBeanFactory().registerSingleton(str, newInstance);
                this.applicationContext.getBeanFactory().initializeBean(newInstance, str);
                return ((PartitionSelectorStrategy) newInstance).selectPartition(obj, i);
            } catch (Exception e) {
                this.logger.error("Failed to instantiate partition selector", e);
                throw new MessageBusException("Failed to instantiate partition selector: " + str, e);
            }
        } catch (Exception e2) {
            this.logger.error("Failed to load partition selector", e2);
            throw new MessageBusException("Failed to load partition selector: " + str, e2);
        }
    }
}
