package org.springframework.xd.dirt.plugins.spark.streaming;

import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.spark.streaming.SparkMessageSender;

/* loaded from: input_file:org/springframework/xd/dirt/plugins/spark/streaming/MessageBusSender.class */
class MessageBusSender extends SparkMessageSender {
    private static final Logger logger = LoggerFactory.getLogger(MessageBusSender.class);
    private final String outputChannelName;
    private final LocalMessageBusHolder messageBusHolder;
    private final Properties messageBusProperties;
    private final Properties moduleProducerProperties;
    private MessageBus messageBus;
    private ConfigurableApplicationContext applicationContext;
    private boolean running;
    private static final String OUTPUT = "output";
    private final MimeType contentType;
    private final SparkStreamingChannel outputChannel;

    public MessageBusSender(String str, Properties properties, Properties properties2, MimeType mimeType) {
        this(null, str, properties, properties2, mimeType);
    }

    public MessageBusSender(LocalMessageBusHolder localMessageBusHolder, String str, Properties properties, Properties properties2, MimeType mimeType) {
        this.running = false;
        this.messageBusHolder = localMessageBusHolder;
        this.outputChannelName = str;
        this.messageBusProperties = properties;
        this.moduleProducerProperties = properties2;
        this.contentType = mimeType;
        this.outputChannel = new SparkStreamingChannel();
    }

    public synchronized void start() {
        if (isRunning()) {
            return;
        }
        this.outputChannel.setBeanName(OUTPUT);
        logger.info("starting MessageBusSender");
        if (this.messageBus == null) {
            if (this.messageBusHolder != null) {
                this.messageBus = this.messageBusHolder.get();
            } else {
                this.applicationContext = MessageBusConfiguration.createApplicationContext(this.messageBusProperties);
                this.messageBus = (MessageBus) this.applicationContext.getBean(MessageBus.class);
            }
            if (this.contentType != null) {
                this.outputChannel.configureMessageConverter(this.contentType);
            }
            this.messageBus.bindProducer(this.outputChannelName, this.outputChannel, this.moduleProducerProperties);
        }
        this.running = true;
    }

    public synchronized void send(Message message) {
        this.outputChannel.send(message);
    }

    public synchronized void stop() {
        if (isRunning() && this.messageBus != null) {
            logger.info("stopping MessageBusSender");
            this.messageBus.unbindProducer(this.outputChannelName, this.outputChannel);
            this.messageBus = null;
        }
        if (this.applicationContext != null) {
            this.applicationContext.close();
            this.applicationContext = null;
        }
        this.running = false;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }
}
