package org.springframework.xd.dirt.plugins.stream;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.x.bus.MessageBus;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.CollectionUtils;
import org.springframework.xd.dirt.plugins.job.JobPlugin;
import org.springframework.xd.module.DeploymentMetadata;
import org.springframework.xd.module.ModuleType;
import org.springframework.xd.module.core.Module;
import org.springframework.xd.module.core.Plugin;
import org.springframework.xd.module.support.BeanDefinitionAddingPostProcessor;

/* loaded from: input_file:org/springframework/xd/dirt/plugins/stream/StreamPlugin.class */
public class StreamPlugin implements Plugin {
    protected final Log logger = LogFactory.getLog(getClass());
    private static final String CONTEXT_CONFIG_ROOT = "META-INF/spring-xd/plugins/stream/";
    private static final String TAP_CHANNEL_PREFIX = "tap:";
    private static final String MESSAGE_BUS = "META-INF/spring-xd/plugins/stream/message-bus.xml";
    private static final String CONTENT_TYPE_BEAN_NAME = "accepted-content-types";
    private static final Collection<MediaType> DEFAULT_ACCEPTED_CONTENT_TYPES = Collections.singletonList(MediaType.ALL);
    private static final String TOPIC_CHANNEL_PREFIX = "topic:";

    public void preProcessModule(Module module) {
        DeploymentMetadata deploymentMetadata = module.getDeploymentMetadata();
        Properties properties = new Properties();
        properties.setProperty("xd.stream.name", deploymentMetadata.getGroup());
        properties.setProperty("xd.module.index", String.valueOf(deploymentMetadata.getIndex()));
        module.addProperties(properties);
    }

    public void postProcessModule(Module module) {
        MessageBus findMessageBus = findMessageBus(module);
        bindConsumer(module, findMessageBus);
        bindProducers(module, findMessageBus);
    }

    public boolean supports(Module module) {
        ModuleType type = module.getType();
        return type == ModuleType.source || type == ModuleType.processor || type == ModuleType.sink;
    }

    private MessageBus findMessageBus(Module module) {
        MessageBus messageBus = null;
        try {
            messageBus = (MessageBus) module.getComponent(MessageBus.class);
        } catch (Exception e) {
            this.logger.error("No MessageBus in context, cannot wire/unwire channels: " + e.getMessage());
        }
        return messageBus;
    }

    private void bindConsumer(Module module, MessageBus messageBus) {
        DeploymentMetadata deploymentMetadata = module.getDeploymentMetadata();
        MessageChannel messageChannel = (MessageChannel) module.getComponent("input", MessageChannel.class);
        if (messageChannel != null) {
            if (isChannelPubSub(deploymentMetadata.getInputChannelName())) {
                messageBus.bindPubSubConsumer(deploymentMetadata.getInputChannelName(), messageChannel, getAcceptedMediaTypes(module));
            } else {
                messageBus.bindConsumer(deploymentMetadata.getInputChannelName(), messageChannel, getAcceptedMediaTypes(module), deploymentMetadata.isAliasedInput());
            }
        }
    }

    private void bindProducers(Module module, MessageBus messageBus) {
        DeploymentMetadata deploymentMetadata = module.getDeploymentMetadata();
        MessageChannel messageChannel = (MessageChannel) module.getComponent("output", MessageChannel.class);
        if (messageChannel != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("binding output channel [" + deploymentMetadata.getOutputChannelName() + "] for " + module);
            }
            if (isChannelPubSub(deploymentMetadata.getOutputChannelName())) {
                messageBus.bindPubSubProducer(deploymentMetadata.getOutputChannelName(), messageChannel);
            } else {
                messageBus.bindProducer(deploymentMetadata.getOutputChannelName(), messageChannel, deploymentMetadata.isAliasedOutput());
            }
            Object extractTarget = extractTarget(messageChannel);
            if (!(extractTarget instanceof AbstractMessageChannel)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("output channel is not an AbstractMessageChannel. Tap will not be created.");
                }
            } else {
                String tapChannelName = getTapChannelName(module);
                DirectChannel directChannel = new DirectChannel();
                directChannel.setBeanName(tapChannelName + ".tap.bridge");
                ((AbstractMessageChannel) extractTarget).addInterceptor(new WireTap(directChannel));
                messageBus.bindPubSubProducer(tapChannelName, directChannel);
            }
        }
    }

    public void beforeShutdown(Module module) {
        MessageBus findMessageBus = findMessageBus(module);
        if (findMessageBus != null) {
            unbindConsumer(module, findMessageBus);
            unbindProducers(module, findMessageBus);
        }
    }

    public void removeModule(Module module) {
    }

    private void unbindConsumer(Module module, MessageBus messageBus) {
        MessageChannel messageChannel = (MessageChannel) module.getComponent("input", MessageChannel.class);
        if (messageChannel != null) {
            messageBus.unbindConsumer(module.getDeploymentMetadata().getInputChannelName(), messageChannel);
        }
    }

    private void unbindProducers(Module module, MessageBus messageBus) {
        MessageChannel messageChannel = (MessageChannel) module.getComponent("output", MessageChannel.class);
        if (messageChannel != null) {
            messageBus.unbindProducer(module.getDeploymentMetadata().getOutputChannelName(), messageChannel);
        }
        messageBus.unbindProducers(getTapChannelName(module));
    }

    private String getTapChannelName(Module module) {
        return TAP_CHANNEL_PREFIX + module.getDeploymentMetadata().getGroup() + JobPlugin.JOB_NAME_DELIMITER + module.getName();
    }

    private boolean isChannelPubSub(String str) {
        return str != null && (str.startsWith(TAP_CHANNEL_PREFIX) || str.startsWith(TOPIC_CHANNEL_PREFIX));
    }

    private Collection<MediaType> getAcceptedMediaTypes(Module module) {
        Collection collection = (Collection) module.getComponent(CONTENT_TYPE_BEAN_NAME, Collection.class);
        if (CollectionUtils.isEmpty(collection)) {
            return DEFAULT_ACCEPTED_CONTENT_TYPES;
        }
        ArrayList arrayList = new ArrayList(collection.size());
        for (Object obj : collection) {
            if (obj instanceof String) {
                arrayList.add(MediaType.valueOf((String) obj));
            } else {
                if (!(obj instanceof MediaType)) {
                    throw new IllegalArgumentException("Unrecognized MediaType :" + obj);
                }
                arrayList.add((MediaType) obj);
            }
        }
        return Collections.unmodifiableCollection(arrayList);
    }

    public void preProcessSharedContext(ConfigurableApplicationContext configurableApplicationContext) {
        configurableApplicationContext.addBeanFactoryPostProcessor(new BeanDefinitionAddingPostProcessor(configurableApplicationContext.getEnvironment(), new Resource[]{new ClassPathResource(MESSAGE_BUS)}));
    }

    private Object extractTarget(Object obj) {
        if (!(obj instanceof Advised)) {
            return obj;
        }
        Advised advised = (Advised) obj;
        if (advised.getTargetSource() == null) {
            return null;
        }
        try {
            return extractTarget(advised.getTargetSource().getTarget());
        } catch (Exception e) {
            this.logger.error("Could not extract target from output channel. Tap will not be created.", e);
            return null;
        }
    }
}
