package org.springframework.integration.x.rabbit;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.Lifecycle;
import org.springframework.http.MediaType;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.x.channel.registry.ChannelRegistrySupport;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/x/rabbit/RabbitChannelRegistry.class */
public class RabbitChannelRegistry extends ChannelRegistrySupport implements DisposableBean {
    private final RabbitAdmin rabbitAdmin;
    private final ConnectionFactory connectionFactory;
    private volatile Integer concurrentConsumers;
    private final DefaultAmqpHeaderMapper mapper;
    private final Log logger = LogFactory.getLog(getClass());
    private final RabbitTemplate rabbitTemplate = new RabbitTemplate();
    private final List<Lifecycle> lifecycleBeans = Collections.synchronizedList(new ArrayList());

    /* loaded from: input_file:org/springframework/integration/x/rabbit/RabbitChannelRegistry$CompositeHandler.class */
    private class CompositeHandler extends AbstractMessageHandler {
        private final AmqpOutboundEndpoint queue;
        private final AmqpOutboundEndpoint tap;

        private CompositeHandler(String str) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("declaring queue for outbound: " + str);
            }
            RabbitChannelRegistry.this.rabbitAdmin.declareQueue(new Queue(str));
            RabbitChannelRegistry.this.rabbitAdmin.declareExchange(new FanoutExchange("tap." + str));
            this.queue = new AmqpOutboundEndpoint(RabbitChannelRegistry.this.rabbitTemplate);
            this.queue.setRoutingKey(str);
            this.queue.setHeaderMapper(RabbitChannelRegistry.this.mapper);
            this.queue.afterPropertiesSet();
            this.tap = new AmqpOutboundEndpoint(RabbitChannelRegistry.this.rabbitTemplate);
            this.tap.setExchangeName("tap." + str);
            this.tap.setHeaderMapper(RabbitChannelRegistry.this.mapper);
            this.tap.afterPropertiesSet();
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            Message transformOutboundIfNecessary = RabbitChannelRegistry.this.transformOutboundIfNecessary(message, MediaType.APPLICATION_OCTET_STREAM);
            this.tap.handleMessage(transformOutboundIfNecessary);
            this.queue.handleMessage(transformOutboundIfNecessary);
        }
    }

    /* loaded from: input_file:org/springframework/integration/x/rabbit/RabbitChannelRegistry$ReceivingHandler.class */
    private class ReceivingHandler extends AbstractReplyProducingMessageHandler {
        private final Collection<MediaType> acceptedMediaTypes;

        public ReceivingHandler(Collection<MediaType> collection) {
            this.acceptedMediaTypes = collection;
        }

        protected Object handleRequestMessage(Message<?> message) {
            return RabbitChannelRegistry.this.transformInboundIfNecessary(message, this.acceptedMediaTypes);
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }

    public RabbitChannelRegistry(ConnectionFactory connectionFactory) {
        Assert.notNull(connectionFactory, "connectionFactory must not be null");
        this.connectionFactory = connectionFactory;
        this.rabbitTemplate.setConnectionFactory(connectionFactory);
        this.rabbitTemplate.afterPropertiesSet();
        this.rabbitAdmin = new RabbitAdmin(connectionFactory);
        this.rabbitAdmin.afterPropertiesSet();
        this.mapper = new DefaultAmqpHeaderMapper();
        this.mapper.setRequestHeaderNames(new String[]{"STANDARD_REQUEST_HEADERS", "originalContentType"});
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void createInbound(String str, MessageChannel messageChannel, Collection<MediaType> collection, boolean z) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("declaring queue for inbound: " + str);
        }
        this.rabbitAdmin.declareQueue(new Queue(str));
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(this.connectionFactory);
        if (this.concurrentConsumers != null) {
            simpleMessageListenerContainer.setConcurrentConsumers(this.concurrentConsumers.intValue());
        }
        simpleMessageListenerContainer.setQueueNames(new String[]{str});
        simpleMessageListenerContainer.afterPropertiesSet();
        Lifecycle amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(simpleMessageListenerContainer);
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName(str + ".bridge");
        amqpInboundChannelAdapter.setOutputChannel(directChannel);
        amqpInboundChannelAdapter.setHeaderMapper(this.mapper);
        amqpInboundChannelAdapter.setBeanName("inbound." + str);
        amqpInboundChannelAdapter.afterPropertiesSet();
        this.lifecycleBeans.add(amqpInboundChannelAdapter);
        ReceivingHandler receivingHandler = new ReceivingHandler(collection);
        receivingHandler.setOutputChannel(messageChannel);
        receivingHandler.setBeanName(str + ".convert.bridge");
        receivingHandler.afterPropertiesSet();
        directChannel.subscribe(receivingHandler);
        amqpInboundChannelAdapter.start();
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void createOutbound(String str, MessageChannel messageChannel, boolean z) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        Lifecycle eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new CompositeHandler(str));
        eventDrivenConsumer.setBeanName("outbound." + str);
        eventDrivenConsumer.afterPropertiesSet();
        this.lifecycleBeans.add(eventDrivenConsumer);
        eventDrivenConsumer.start();
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void tap(String str, String str2, MessageChannel messageChannel) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(this.connectionFactory);
        if (this.concurrentConsumers != null) {
            simpleMessageListenerContainer.setConcurrentConsumers(this.concurrentConsumers.intValue());
        }
        Queue declareQueue = this.rabbitAdmin.declareQueue();
        this.rabbitAdmin.declareBinding(BindingBuilder.bind(declareQueue).to(new FanoutExchange("tap." + str2)));
        simpleMessageListenerContainer.setQueues(new Queue[]{declareQueue});
        simpleMessageListenerContainer.afterPropertiesSet();
        Lifecycle amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(simpleMessageListenerContainer);
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName(str + ".bridge");
        amqpInboundChannelAdapter.setOutputChannel(directChannel);
        amqpInboundChannelAdapter.setHeaderMapper(this.mapper);
        amqpInboundChannelAdapter.setBeanName("tap." + str2);
        amqpInboundChannelAdapter.setComponentName(str + ".tapAdapter");
        amqpInboundChannelAdapter.afterPropertiesSet();
        this.lifecycleBeans.add(amqpInboundChannelAdapter);
        ReceivingHandler receivingHandler = new ReceivingHandler(Collections.singletonList(MediaType.ALL));
        receivingHandler.setOutputChannel(messageChannel);
        receivingHandler.setBeanName(str2 + ".convert.bridge");
        receivingHandler.afterPropertiesSet();
        directChannel.subscribe(receivingHandler);
        amqpInboundChannelAdapter.start();
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void deleteInbound(String str) {
        synchronized (this.lifecycleBeans) {
            Iterator<Lifecycle> it = this.lifecycleBeans.iterator();
            while (it.hasNext()) {
                AmqpInboundChannelAdapter amqpInboundChannelAdapter = (Lifecycle) it.next();
                if (amqpInboundChannelAdapter instanceof AmqpInboundChannelAdapter) {
                    String componentName = ((IntegrationObjectSupport) amqpInboundChannelAdapter).getComponentName();
                    if (("inbound." + str).equals(componentName) || (str + ".tapAdapter").equals(componentName)) {
                        amqpInboundChannelAdapter.stop();
                        it.remove();
                    }
                }
            }
        }
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void deleteOutbound(String str) {
        synchronized (this.lifecycleBeans) {
            Iterator<Lifecycle> it = this.lifecycleBeans.iterator();
            while (it.hasNext()) {
                EventDrivenConsumer eventDrivenConsumer = (Lifecycle) it.next();
                if ((eventDrivenConsumer instanceof EventDrivenConsumer) && ("outbound." + str).equals(((IntegrationObjectSupport) eventDrivenConsumer).getComponentName())) {
                    eventDrivenConsumer.stop();
                    it.remove();
                    return;
                }
            }
        }
    }

    public void destroy() {
        Iterator<Lifecycle> it = this.lifecycleBeans.iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (Exception e) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn("failed to stop adapter", e);
                }
            }
        }
    }
}
