package org.springframework.xd.dirt.plugins.spark.streaming;

import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.receiver.BlockGenerator;
import org.apache.spark.streaming.receiver.BlockGeneratorListener;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.MimeType;
import org.springframework.xd.dirt.integration.bus.BusUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import scala.collection.mutable.ArrayBuffer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/springframework/xd/dirt/plugins/spark/streaming/MessageBusReceiver.class */
public class MessageBusReceiver extends Receiver {
    private static final long serialVersionUID = 1;
    private static final Logger logger = LoggerFactory.getLogger(MessageBusReceiver.class);
    private MessageBus messageBus;
    private ConfigurableApplicationContext applicationContext;
    private String channelName;
    private final LocalMessageBusHolder messageBusHolder;
    private final Properties messageBusProperties;
    private final Properties moduleConsumerProperties;
    private final MimeType contentType;
    private BlockGenerator blockGenerator;
    private LinkedBlockingQueue<MessageHeaders> headersList;

    /* loaded from: input_file:org/springframework/xd/dirt/plugins/spark/streaming/MessageBusReceiver$GeneratedBlockHandler.class */
    private class GeneratedBlockHandler implements BlockGeneratorListener {
        private GeneratedBlockHandler() {
        }

        public void onAddData(Object obj, Object obj2) {
            MessageBusReceiver.logger.debug("Adding data to block generator buffer");
        }

        public void onError(String str, Throwable th) {
            MessageBusReceiver.this.reportError(str, th);
        }

        public void onPushBlock(StreamBlockId streamBlockId, ArrayBuffer<?> arrayBuffer) {
            MessageBusReceiver.this.store(arrayBuffer);
            LinkedList linkedList = new LinkedList();
            for (int i = 0; i < arrayBuffer.size(); i++) {
                try {
                    linkedList.add(MessageBusReceiver.this.headersList.take());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            MessageBusReceiver.this.messageBus.doManualAck(linkedList);
        }

        public void onGenerateBlock(StreamBlockId streamBlockId) {
            MessageBusReceiver.logger.debug("Generated block " + streamBlockId);
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/plugins/spark/streaming/MessageBusReceiver$MessageStoringChannel.class */
    private class MessageStoringChannel extends SparkStreamingChannel {
        private static final long serialVersionUID = 1;
        private static final String INPUT = "input";

        public MessageStoringChannel() {
            setBeanName(INPUT);
        }

        protected synchronized boolean doSend(Message<?> message, long j) {
            try {
                MessageBusReceiver.this.headersList.put(message.getHeaders());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            MessageBusReceiver.this.blockGenerator.addDataWithCallback(message.getPayload(), message.getHeaders());
            return true;
        }
    }

    public MessageBusReceiver(StorageLevel storageLevel, Properties properties, Properties properties2, MimeType mimeType) {
        this(null, storageLevel, properties, properties2, mimeType);
    }

    public MessageBusReceiver(LocalMessageBusHolder localMessageBusHolder, StorageLevel storageLevel, Properties properties, Properties properties2, MimeType mimeType) {
        super(storageLevel);
        this.blockGenerator = null;
        this.headersList = new LinkedBlockingQueue<>();
        this.messageBusHolder = localMessageBusHolder;
        this.messageBusProperties = properties;
        this.moduleConsumerProperties = properties2;
        this.contentType = mimeType;
    }

    public void setInputChannelName(String str) {
        this.channelName = str;
    }

    public void onStart() {
        logger.info("starting MessageBusReceiver");
        MessageStoringChannel messageStoringChannel = new MessageStoringChannel();
        this.blockGenerator = new BlockGenerator(new GeneratedBlockHandler(), 0, new SparkConf());
        this.blockGenerator.start();
        if (this.contentType != null) {
            messageStoringChannel.configureMessageConverter(this.contentType);
        }
        if (this.messageBusHolder != null) {
            this.messageBus = this.messageBusHolder.get();
        } else {
            this.applicationContext = MessageBusConfiguration.createApplicationContext(this.messageBusProperties);
            this.messageBus = (MessageBus) this.applicationContext.getBean(MessageBus.class);
        }
        if (BusUtils.isChannelPubSub(this.channelName)) {
            this.messageBus.bindPubSubConsumer(this.channelName, messageStoringChannel, this.moduleConsumerProperties);
        } else {
            this.messageBus.bindConsumer(this.channelName, messageStoringChannel, this.moduleConsumerProperties);
        }
    }

    public void onStop() {
        logger.info("stopping MessageBusReceiver");
        this.blockGenerator.stop();
        if (this.messageBus != null) {
            this.messageBus.unbindConsumers(this.channelName);
        }
        if (this.applicationContext != null) {
            this.applicationContext.close();
        }
    }
}
