package top.arkstack.shine.mq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.rabbitmq.client.Channel;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import top.arkstack.shine.mq.bean.EventMessage;
import top.arkstack.shine.mq.processor.Processor;

/* loaded from: input_file:top/arkstack/shine/mq/MessageAdapterHandler.class */
public class MessageAdapterHandler implements ChannelAwareMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(MessageAdapterHandler.class);
    private ConcurrentMap<String, ProcessorWrap> map = new ConcurrentHashMap();

    /* loaded from: input_file:top/arkstack/shine/mq/MessageAdapterHandler$ProcessorWrap.class */
    protected static class ProcessorWrap {
        private MessageConverter messageConverter;
        private Processor processor;

        protected ProcessorWrap(MessageConverter messageConverter, Processor processor) {
            this.messageConverter = messageConverter;
            this.processor = processor;
        }

        public Object process(Object obj, Message message, Channel channel) {
            return this.processor.process(obj, message, channel);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void add(String str, String str2, String str3, Processor processor, MessageConverter messageConverter) {
        Objects.requireNonNull(str, "The queueName is empty.");
        Objects.requireNonNull(str2, "The exchangeName is empty.");
        Objects.requireNonNull(messageConverter, "The messageConverter is empty.");
        Objects.requireNonNull(str3, "The routingKey is empty.");
        if (this.map.putIfAbsent(str + "_" + str2 + "_" + str3, new ProcessorWrap(messageConverter, processor)) != null) {
            logger.warn("The processor of this queue and exchange exists");
        }
    }

    public void onMessage(Message message, Channel channel) throws Exception {
        EventMessage eventMessage = (EventMessage) JSON.parseObject(message.getBody(), EventMessage.class, new Feature[0]);
        this.map.get(eventMessage.getQueueName() + "_" + eventMessage.getExchangeName() + "_" + eventMessage.getRoutingKey()).process(eventMessage.getData(), message, channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<String> getAllBinding() {
        return this.map.keySet();
    }
}
