package org.springframework.integration.x.rabbit;

import java.util.Collection;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.http.MediaType;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.x.bus.Binding;
import org.springframework.integration.x.bus.MessageBusSupport;
import org.springframework.integration.x.bus.serializer.MultiTypeCodec;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/x/rabbit/RabbitMessageBus.class */
public class RabbitMessageBus extends MessageBusSupport implements DisposableBean {
    private final RabbitAdmin rabbitAdmin;
    private final ConnectionFactory connectionFactory;
    private volatile Integer concurrentConsumers;
    private final DefaultAmqpHeaderMapper mapper;
    private final Log logger = LogFactory.getLog(getClass());
    private final RabbitTemplate rabbitTemplate = new RabbitTemplate();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/x/rabbit/RabbitMessageBus$ReceivingHandler.class */
    public class ReceivingHandler extends AbstractReplyProducingMessageHandler {
        private final Collection<MediaType> acceptedMediaTypes;

        public ReceivingHandler(Collection<MediaType> collection) {
            this.acceptedMediaTypes = collection;
        }

        protected Object handleRequestMessage(Message<?> message) {
            return RabbitMessageBus.this.transformPayloadForConsumerIfNecessary(message, this.acceptedMediaTypes);
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/x/rabbit/RabbitMessageBus$SendingHandler.class */
    public class SendingHandler extends AbstractMessageHandler {
        private final MessageHandler delegate;

        private SendingHandler(MessageHandler messageHandler) {
            this.delegate = messageHandler;
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            this.delegate.handleMessage(RabbitMessageBus.this.transformPayloadForProducerIfNecessary(message, MediaType.APPLICATION_OCTET_STREAM));
        }
    }

    public RabbitMessageBus(ConnectionFactory connectionFactory, MultiTypeCodec<Object> multiTypeCodec) {
        Assert.notNull(connectionFactory, "connectionFactory must not be null");
        Assert.notNull(multiTypeCodec, "codec must not be null");
        this.connectionFactory = connectionFactory;
        this.rabbitTemplate.setConnectionFactory(connectionFactory);
        this.rabbitTemplate.afterPropertiesSet();
        this.rabbitAdmin = new RabbitAdmin(connectionFactory);
        this.rabbitAdmin.afterPropertiesSet();
        this.mapper = new DefaultAmqpHeaderMapper();
        this.mapper.setRequestHeaderNames(new String[]{"STANDARD_REQUEST_HEADERS", "originalContentType"});
        setCodec(multiTypeCodec);
    }

    @Override // org.springframework.integration.x.bus.MessageBus
    public void bindConsumer(String str, MessageChannel messageChannel, Collection<MediaType> collection, boolean z) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("declaring queue for inbound: " + str);
        }
        Queue queue = new Queue(str);
        this.rabbitAdmin.declareQueue(queue);
        doRegisterConsumer(str, messageChannel, collection, queue);
    }

    @Override // org.springframework.integration.x.bus.MessageBus
    public void bindPubSubConsumer(String str, MessageChannel messageChannel, Collection<MediaType> collection) {
        FanoutExchange fanoutExchange = new FanoutExchange("topic." + str);
        this.rabbitAdmin.declareExchange(fanoutExchange);
        Queue declareQueue = this.rabbitAdmin.declareQueue();
        this.rabbitAdmin.declareBinding(BindingBuilder.bind(declareQueue).to(fanoutExchange));
        doRegisterConsumer(str, messageChannel, collection, declareQueue);
    }

    private void doRegisterConsumer(String str, MessageChannel messageChannel, Collection<MediaType> collection, Queue queue) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(this.connectionFactory);
        if (this.concurrentConsumers != null) {
            simpleMessageListenerContainer.setConcurrentConsumers(this.concurrentConsumers.intValue());
        }
        simpleMessageListenerContainer.setQueues(new Queue[]{queue});
        simpleMessageListenerContainer.setAdviceChain(new Advice[]{new StatelessRetryOperationsInterceptorFactoryBean().getObject()});
        simpleMessageListenerContainer.afterPropertiesSet();
        AmqpInboundChannelAdapter amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(simpleMessageListenerContainer);
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName(str + ".bridge");
        amqpInboundChannelAdapter.setOutputChannel(directChannel);
        amqpInboundChannelAdapter.setHeaderMapper(this.mapper);
        amqpInboundChannelAdapter.setBeanName("inbound." + str);
        amqpInboundChannelAdapter.afterPropertiesSet();
        addBinding(Binding.forConsumer(amqpInboundChannelAdapter, messageChannel));
        ReceivingHandler receivingHandler = new ReceivingHandler(collection);
        receivingHandler.setOutputChannel(messageChannel);
        receivingHandler.setBeanName(str + ".convert.bridge");
        receivingHandler.afterPropertiesSet();
        directChannel.subscribe(receivingHandler);
        amqpInboundChannelAdapter.start();
    }

    @Override // org.springframework.integration.x.bus.MessageBus
    public void bindProducer(String str, MessageChannel messageChannel, boolean z) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("declaring queue for outbound: " + str);
        }
        this.rabbitAdmin.declareQueue(new Queue(str));
        AmqpOutboundEndpoint amqpOutboundEndpoint = new AmqpOutboundEndpoint(this.rabbitTemplate);
        amqpOutboundEndpoint.setRoutingKey(str);
        amqpOutboundEndpoint.setHeaderMapper(this.mapper);
        amqpOutboundEndpoint.afterPropertiesSet();
        doRegisterProducer(str, messageChannel, amqpOutboundEndpoint);
    }

    @Override // org.springframework.integration.x.bus.MessageBus
    public void bindPubSubProducer(String str, MessageChannel messageChannel) {
        this.rabbitAdmin.declareExchange(new FanoutExchange("topic." + str));
        AmqpOutboundEndpoint amqpOutboundEndpoint = new AmqpOutboundEndpoint(this.rabbitTemplate);
        amqpOutboundEndpoint.setExchangeName("topic." + str);
        amqpOutboundEndpoint.setHeaderMapper(this.mapper);
        amqpOutboundEndpoint.afterPropertiesSet();
        doRegisterProducer(str, messageChannel, amqpOutboundEndpoint);
    }

    private void doRegisterProducer(String str, MessageChannel messageChannel, MessageHandler messageHandler) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new SendingHandler(messageHandler));
        eventDrivenConsumer.setBeanName("outbound." + str);
        eventDrivenConsumer.afterPropertiesSet();
        addBinding(Binding.forProducer(messageChannel, eventDrivenConsumer));
        eventDrivenConsumer.start();
    }

    public void destroy() {
        stopBindings();
    }
}
