package org.springframework.xd.dirt.plugins;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.core.Module;

/* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin.class */
public abstract class AbstractMessageBusBinderPlugin extends AbstractPlugin {
    protected static final String MODULE_INPUT_CHANNEL = "input";
    protected static final String MODULE_OUTPUT_CHANNEL = "output";
    protected static final String TAP_CHANNEL_PREFIX = "tap:";
    protected static final String TOPIC_CHANNEL_PREFIX = "topic:";
    protected static final String JOB_CHANNEL_PREFIX = "job:";
    protected final MessageBus messageBus;
    private volatile PathChildrenCache taps;
    private final TapListener tapListener;
    private final Map<String, MessageChannel> tappableChannels;

    /* renamed from: org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin$1, reason: invalid class name */
    /* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.INITIALIZED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin$TapLifecycleConnectionListener.class */
    class TapLifecycleConnectionListener implements ZooKeeperConnectionListener {
        TapLifecycleConnectionListener() {
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onDisconnect(CuratorFramework curatorFramework) {
            AbstractMessageBusBinderPlugin.this.taps.getListenable().removeListener(AbstractMessageBusBinderPlugin.this.tapListener);
            AbstractMessageBusBinderPlugin.this.taps.clear();
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onConnect(CuratorFramework curatorFramework) {
            AbstractMessageBusBinderPlugin.this.startTapListener(curatorFramework);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/xd/dirt/plugins/AbstractMessageBusBinderPlugin$TapListener.class */
    public class TapListener implements PathChildrenCacheListener {
        TapListener() {
        }

        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            ZooKeeperUtils.logCacheEvent(AbstractMessageBusBinderPlugin.this.logger, pathChildrenCacheEvent);
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case 1:
                default:
                    return;
                case 2:
                    AbstractMessageBusBinderPlugin.this.onTapAdded(curatorFramework, pathChildrenCacheEvent.getData());
                    return;
                case 3:
                    AbstractMessageBusBinderPlugin.this.onTapRemoved(curatorFramework, pathChildrenCacheEvent.getData());
                    return;
            }
        }
    }

    public AbstractMessageBusBinderPlugin(MessageBus messageBus) {
        this(messageBus, null);
    }

    public AbstractMessageBusBinderPlugin(MessageBus messageBus, ZooKeeperConnection zooKeeperConnection) {
        this.tapListener = new TapListener();
        this.tappableChannels = new HashMap();
        Assert.notNull(messageBus, "MessageBus must not be null.");
        this.messageBus = messageBus;
        if (zooKeeperConnection != null) {
            if (zooKeeperConnection.isConnected()) {
                startTapListener(zooKeeperConnection.getClient());
            }
            zooKeeperConnection.addListener(new TapLifecycleConnectionListener());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startTapListener(CuratorFramework curatorFramework) {
        String build = Paths.build(Paths.TAPS);
        Paths.ensurePath(curatorFramework, build);
        this.taps = new PathChildrenCache(curatorFramework, build, true, ThreadUtils.newThreadFactory("TapsPathChildrenCache"));
        this.taps.getListenable().addListener(this.tapListener);
        try {
            this.taps.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        } catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e, "failed to start TapListener");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void bindConsumerAndProducers(Module module) {
        Properties[] extractConsumerProducerProperties = extractConsumerProducerProperties(module);
        MessageChannel messageChannel = (MessageChannel) module.getComponent(MODULE_INPUT_CHANNEL, MessageChannel.class);
        if (messageChannel != null) {
            bindMessageConsumer(messageChannel, getInputChannelName(module), extractConsumerProducerProperties[0]);
        }
        MessageChannel messageChannel2 = (MessageChannel) module.getComponent(MODULE_OUTPUT_CHANNEL, MessageChannel.class);
        if (messageChannel2 != null) {
            bindMessageProducer(messageChannel2, getOutputChannelName(module), extractConsumerProducerProperties[1]);
            String buildTapChannelName = buildTapChannelName(module);
            this.tappableChannels.put(buildTapChannelName, messageChannel2);
            if (isTapActive(buildTapChannelName)) {
                createAndBindTapChannel(buildTapChannelName, messageChannel2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Properties[] extractConsumerProducerProperties(Module module) {
        Properties properties = new Properties();
        Properties properties2 = new Properties();
        if (module.getDeploymentProperties() != null) {
            for (Map.Entry entry : module.getDeploymentProperties().entrySet()) {
                if (((String) entry.getKey()).startsWith("consumer.")) {
                    properties.put(((String) entry.getKey()).substring("consumer.".length()), entry.getValue());
                } else if (((String) entry.getKey()).startsWith("producer.")) {
                    properties2.put(((String) entry.getKey()).substring("producer.".length()), entry.getValue());
                }
            }
        }
        return new Properties[]{properties, properties2};
    }

    protected abstract String getInputChannelName(Module module);

    protected abstract String getOutputChannelName(Module module);

    protected abstract String buildTapChannelName(Module module);

    private void bindMessageConsumer(MessageChannel messageChannel, String str, Properties properties) {
        if (isChannelPubSub(str)) {
            this.messageBus.bindPubSubConsumer(str, messageChannel, properties);
        } else {
            this.messageBus.bindConsumer(str, messageChannel, properties);
        }
    }

    private void bindMessageProducer(MessageChannel messageChannel, String str, Properties properties) {
        if (isChannelPubSub(str)) {
            this.messageBus.bindPubSubProducer(str, messageChannel, properties);
        } else {
            this.messageBus.bindProducer(str, messageChannel, properties);
        }
    }

    private void createAndBindTapChannel(String str, MessageChannel messageChannel) {
        this.logger.info("creating and binding tap channel for {}", str);
        if (messageChannel instanceof ChannelInterceptorAware) {
            this.messageBus.bindPubSubProducer(str, tapOutputChannel(str, (ChannelInterceptorAware) messageChannel), null);
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("output channel is not interceptor aware. Tap will not be created.");
        }
    }

    private MessageChannel tapOutputChannel(String str, ChannelInterceptorAware channelInterceptorAware) {
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName(str + ".tap.bridge");
        channelInterceptorAware.addInterceptor(new WireTap(directChannel));
        return directChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void unbindConsumerAndProducers(Module module) {
        MessageChannel messageChannel = (MessageChannel) module.getComponent(MODULE_INPUT_CHANNEL, MessageChannel.class);
        if (messageChannel != null) {
            this.messageBus.unbindConsumer(getInputChannelName(module), messageChannel);
        }
        MessageChannel messageChannel2 = (MessageChannel) module.getComponent(MODULE_OUTPUT_CHANNEL, MessageChannel.class);
        if (messageChannel2 != null) {
            this.messageBus.unbindProducer(getOutputChannelName(module), messageChannel2);
            unbindTapChannel(buildTapChannelName(module));
        }
    }

    private void unbindTapChannel(String str) {
        ChannelInterceptorAware channelInterceptorAware = (MessageChannel) this.tappableChannels.remove(str);
        if (channelInterceptorAware instanceof ChannelInterceptorAware) {
            ChannelInterceptorAware channelInterceptorAware2 = channelInterceptorAware;
            ArrayList arrayList = new ArrayList();
            for (WireTap wireTap : channelInterceptorAware2.getChannelInterceptors()) {
                if (wireTap instanceof WireTap) {
                    wireTap.stop();
                } else {
                    arrayList.add(wireTap);
                }
            }
            channelInterceptorAware2.setInterceptors(arrayList);
            this.messageBus.unbindProducers(str);
        }
    }

    private boolean isChannelPubSub(String str) {
        Assert.isTrue(StringUtils.hasText(str), "Channel name should not be empty/null.");
        return str.startsWith(TAP_CHANNEL_PREFIX) || str.startsWith(TOPIC_CHANNEL_PREFIX);
    }

    @Override // org.springframework.xd.dirt.plugins.AbstractPlugin
    public int getOrder() {
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onTapAdded(CuratorFramework curatorFramework, ChildData childData) {
        String buildTapChannelNameFromPath = buildTapChannelNameFromPath(childData.getPath());
        MessageChannel messageChannel = this.tappableChannels.get(buildTapChannelNameFromPath);
        if (messageChannel != null) {
            createAndBindTapChannel(buildTapChannelNameFromPath, messageChannel);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onTapRemoved(CuratorFramework curatorFramework, ChildData childData) {
        unbindTapChannel(buildTapChannelNameFromPath(childData.getPath()));
    }

    private boolean isTapActive(String str) {
        Assert.state(this.taps != null, "tap cache not started");
        Iterator it = this.taps.getCurrentData().iterator();
        while (it.hasNext()) {
            if (buildTapChannelNameFromPath(((ChildData) it.next()).getPath()).equals(str)) {
                return true;
            }
        }
        return false;
    }

    private String buildTapChannelNameFromPath(String str) {
        return TAP_CHANNEL_PREFIX + Paths.stripPath(str);
    }
}
