package org.springframework.integration.x.redis;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.Lifecycle;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.redis.inbound.RedisInboundChannelAdapter;
import org.springframework.integration.redis.outbound.RedisPublishingMessageHandler;
import org.springframework.integration.x.channel.registry.ChannelRegistry;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/x/redis/RedisChannelRegistry.class */
public class RedisChannelRegistry implements ChannelRegistry, DisposableBean {
    private final Log logger = LogFactory.getLog(getClass());
    private final StringRedisTemplate redisTemplate = new StringRedisTemplate();
    private final List<Lifecycle> lifecycleBeans = Collections.synchronizedList(new ArrayList());

    /* loaded from: input_file:org/springframework/integration/x/redis/RedisChannelRegistry$CompositeHandler.class */
    private static class CompositeHandler extends AbstractMessageHandler {
        private final RedisPublishingMessageHandler topic;
        private final RedisQueueOutboundChannelAdapter queue;

        private CompositeHandler(String str, RedisConnectionFactory redisConnectionFactory) {
            RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(redisConnectionFactory);
            redisPublishingMessageHandler.setDefaultTopic("topic." + str);
            redisPublishingMessageHandler.afterPropertiesSet();
            this.topic = redisPublishingMessageHandler;
            RedisQueueOutboundChannelAdapter redisQueueOutboundChannelAdapter = new RedisQueueOutboundChannelAdapter("queue." + str, redisConnectionFactory);
            redisQueueOutboundChannelAdapter.afterPropertiesSet();
            this.queue = redisQueueOutboundChannelAdapter;
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            this.topic.handleMessage(message);
            this.queue.handleMessage(message);
        }
    }

    public RedisChannelRegistry(RedisConnectionFactory redisConnectionFactory) {
        Assert.notNull(redisConnectionFactory, "connectionFactory must not be null");
        this.redisTemplate.setConnectionFactory(redisConnectionFactory);
        this.redisTemplate.afterPropertiesSet();
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void inbound(String str, MessageChannel messageChannel) {
        Lifecycle redisQueueInboundChannelAdapter = new RedisQueueInboundChannelAdapter("queue." + str, this.redisTemplate.getConnectionFactory());
        redisQueueInboundChannelAdapter.setOutputChannel(messageChannel);
        redisQueueInboundChannelAdapter.setBeanName("inbound." + str);
        redisQueueInboundChannelAdapter.afterPropertiesSet();
        this.lifecycleBeans.add(redisQueueInboundChannelAdapter);
        redisQueueInboundChannelAdapter.start();
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void outbound(String str, MessageChannel messageChannel) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        Lifecycle eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new CompositeHandler(str, this.redisTemplate.getConnectionFactory()));
        eventDrivenConsumer.setBeanName("outbound." + str);
        eventDrivenConsumer.afterPropertiesSet();
        this.lifecycleBeans.add(eventDrivenConsumer);
        eventDrivenConsumer.start();
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void tap(String str, String str2, MessageChannel messageChannel) {
        Lifecycle redisInboundChannelAdapter = new RedisInboundChannelAdapter(this.redisTemplate.getConnectionFactory());
        redisInboundChannelAdapter.setTopics(new String[]{"topic." + str2});
        redisInboundChannelAdapter.setOutputChannel(messageChannel);
        redisInboundChannelAdapter.setBeanName("tap." + str2);
        redisInboundChannelAdapter.setComponentName(str + ".tapAdapter");
        redisInboundChannelAdapter.afterPropertiesSet();
        this.lifecycleBeans.add(redisInboundChannelAdapter);
        redisInboundChannelAdapter.start();
    }

    @Override // org.springframework.integration.x.channel.registry.ChannelRegistry
    public void cleanAll(String str) {
        synchronized (this.lifecycleBeans) {
            Iterator<Lifecycle> it = this.lifecycleBeans.iterator();
            while (it.hasNext()) {
                EventDrivenConsumer eventDrivenConsumer = (Lifecycle) it.next();
                if ((eventDrivenConsumer instanceof EventDrivenConsumer) && ("outbound." + str).equals(((IntegrationObjectSupport) eventDrivenConsumer).getComponentName())) {
                    eventDrivenConsumer.stop();
                    it.remove();
                } else if ((eventDrivenConsumer instanceof RedisQueueInboundChannelAdapter) && ("inbound." + str).equals(((IntegrationObjectSupport) eventDrivenConsumer).getComponentName())) {
                    ((RedisQueueInboundChannelAdapter) eventDrivenConsumer).stop();
                    it.remove();
                } else if ((eventDrivenConsumer instanceof RedisInboundChannelAdapter) && (str + ".tapAdapter").equals(((IntegrationObjectSupport) eventDrivenConsumer).getComponentName())) {
                    ((RedisInboundChannelAdapter) eventDrivenConsumer).stop();
                    it.remove();
                }
            }
        }
    }

    public void destroy() {
        Iterator<Lifecycle> it = this.lifecycleBeans.iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (Exception e) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn("failed to stop adapter", e);
                }
            }
        }
    }
}
