package top.arkstack.shine.mq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.rabbitmq.client.Channel;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import top.arkstack.shine.mq.bean.EventMessage;
import top.arkstack.shine.mq.bean.SendTypeEnum;
import top.arkstack.shine.mq.constant.MqConstant;
import top.arkstack.shine.mq.coordinator.Coordinator;
import top.arkstack.shine.mq.processor.Processor;

@Component
/* loaded from: input_file:top/arkstack/shine/mq/MessageAdapterHandler.class */
public class MessageAdapterHandler implements ChannelAwareMessageListener {
    private static final Logger log = LoggerFactory.getLogger(MessageAdapterHandler.class);
    private static final Logger logger = LoggerFactory.getLogger(MessageAdapterHandler.class);
    private ConcurrentMap<String, ProcessorWrap> map = new ConcurrentHashMap();

    @Autowired
    ApplicationContext applicationContext;

    @Autowired
    RabbitmqFactory rabbitmqFactory;

    /* loaded from: input_file:top/arkstack/shine/mq/MessageAdapterHandler$ProcessorWrap.class */
    protected static class ProcessorWrap {
        private MessageConverter messageConverter;
        private Processor processor;

        protected ProcessorWrap(MessageConverter messageConverter, Processor processor) {
            this.messageConverter = messageConverter;
            this.processor = processor;
        }

        public Object process(Object obj, Message message, Channel channel) {
            return this.processor.process(obj, message, channel);
        }
    }

    protected MessageAdapterHandler() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void add(String str, String str2, Processor processor, SendTypeEnum sendTypeEnum, MessageConverter messageConverter) {
        Objects.requireNonNull(str, "The exchangeName is empty.");
        Objects.requireNonNull(messageConverter, "The messageConverter is empty.");
        Objects.requireNonNull(str2, "The routingKey is empty.");
        if (this.map.putIfAbsent(str + MqConstant.SPLIT + str2 + MqConstant.SPLIT + (sendTypeEnum == null ? SendTypeEnum.DIRECT.toString() : sendTypeEnum.toString()), new ProcessorWrap(messageConverter, processor)) != null) {
            logger.warn("The processor of this queue and exchange exists");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void remove(String str, String str2, SendTypeEnum sendTypeEnum) {
        this.map.remove(str + MqConstant.SPLIT + str2 + MqConstant.SPLIT + (sendTypeEnum == null ? SendTypeEnum.DIRECT.toString() : sendTypeEnum.toString()));
    }

    public void onMessage(Message message, Channel channel) throws Exception {
        EventMessage eventMessage = null;
        Coordinator coordinator = null;
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();
        try {
            EventMessage eventMessage2 = (EventMessage) JSON.parseObject(message.getBody(), EventMessage.class, new Feature[0]);
            ProcessorWrap processorWrap = (MqConstant.DEAD_LETTER_EXCHANGE.equals(message.getMessageProperties().getReceivedExchange()) && MqConstant.DEAD_LETTER_ROUTEKEY.equals(message.getMessageProperties().getReceivedRoutingKey())) ? this.map.get("dead_letter_exchange_dead_letter_routekey_" + SendTypeEnum.DLX) : this.map.get(eventMessage2.getExchangeName() + MqConstant.SPLIT + eventMessage2.getRoutingKey() + MqConstant.SPLIT + eventMessage2.getSendTypeEnum());
            if (SendTypeEnum.DISTRIBUTED.toString().equals(eventMessage2.getSendTypeEnum())) {
                Objects.requireNonNull(eventMessage2.getCoordinator(), "Distributed transaction message error: coordinator is null.");
                processorWrap.process(eventMessage2.getCheckBackId(), message, channel);
                channel.basicAck(deliveryTag, false);
            } else {
                processorWrap.process(eventMessage2.getCheckBackId(), message, channel);
            }
        } catch (Exception e) {
            log.error("MessageAdapterHandler error, message: {} :", message.getBody(), e);
            if (0 == 0 || 0 == 0 || !SendTypeEnum.DISTRIBUTED.toString().equals(eventMessage.getSendTypeEnum())) {
                throw e;
            }
            if (coordinator.incrementResendKey(MqConstant.RECEIVE_RETRIES, messageId).doubleValue() < this.rabbitmqFactory.getConfig().getDistributed().getReceiveMaxRetries().intValue()) {
                channel.basicNack(deliveryTag, false, true);
            } else {
                channel.basicNack(deliveryTag, false, false);
                coordinator.delResendKey(MqConstant.RECEIVE_RETRIES, messageId);
            }
        }
    }

    protected Set<String> getAllBinding() {
        return this.map.keySet();
    }
}
