/*
 * Decompiled with CFR 0.152.
 */
package arp.message.rocketmq;

import arp.message.rocketmq.FSTDeserializationStrategy;
import arp.message.rocketmq.RocketmqMessageDeserializationStrategy;
import arp.process.publish.Message;
import arp.process.publish.ProcessMessageReceiver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

public class RocketmgMessageReceiver
implements ProcessMessageReceiver {
    private DefaultMQPushConsumer consumer;
    private ConcurrentLinkedQueue<Message> messageQueue = new ConcurrentLinkedQueue();

    public RocketmgMessageReceiver(String consumerGroup, String namesrvAddr) throws MQClientException {
        this(consumerGroup, namesrvAddr, new FSTDeserializationStrategy());
    }

    public RocketmgMessageReceiver(String consumerGroup, String namesrvAddr, final RocketmqMessageDeserializationStrategy deserializationStrategy) throws MQClientException {
        this.consumer = new DefaultMQPushConsumer(consumerGroup);
        this.consumer.setNamesrvAddr(namesrvAddr);
        this.consumer.subscribe("arp_process_message", "*");
        this.consumer.registerMessageListener(new MessageListenerConcurrently(){

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                if (msgs != null) {
                    for (MessageExt msg : msgs) {
                        try {
                            RocketmgMessageReceiver.this.messageQueue.offer(deserializationStrategy.deserialize(msg.getBody()));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        this.consumer.start();
    }

    public List<Message> receive() throws Exception {
        ArrayList<Message> msgLst = new ArrayList<Message>();
        int size = this.messageQueue.size();
        for (int i = 0; i < size; ++i) {
            msgLst.add(this.messageQueue.poll());
        }
        return msgLst;
    }
}

