package group.idealworld.dew.core.cluster;

import group.idealworld.dew.core.cluster.dto.MessageWrap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:group/idealworld/dew/core/cluster/AbsClusterMQ.class */
public abstract class AbsClusterMQ implements ClusterMQ {
    protected static Logger logger = LoggerFactory.getLogger(AbsClusterMQ.class);

    @Override // group.idealworld.dew.core.cluster.ClusterMQ
    public boolean publish(String str, String str2, Map<String, Object> map, boolean z) {
        logger.trace("[MQ] publish {}:{}", str, str2);
        return doPublish(str, str2, Optional.ofNullable(map), z);
    }

    protected abstract boolean doPublish(String str, String str2, Optional<Map<String, Object>> optional, boolean z);

    @Override // group.idealworld.dew.core.cluster.ClusterMQ
    public void subscribe(String str, Consumer<MessageWrap> consumer) {
        logger.trace("[MQ] subscribe {}", str);
        receiveMsg(str, consumer, false);
    }

    protected abstract void doSubscribe(String str, Consumer<MessageWrap> consumer);

    @Override // group.idealworld.dew.core.cluster.ClusterMQ
    public boolean request(String str, String str2, Map<String, Object> map, boolean z) {
        logger.trace("[MQ] request {}:{}", str, str2);
        return doRequest(str, str2, Optional.ofNullable(map), z);
    }

    protected abstract boolean doRequest(String str, String str2, Optional<Map<String, Object>> optional, boolean z);

    @Override // group.idealworld.dew.core.cluster.ClusterMQ
    public void response(String str, Consumer<MessageWrap> consumer) {
        logger.trace("[MQ] response {}", str);
        receiveMsg(str, consumer, true);
    }

    protected abstract void doResponse(String str, Consumer<MessageWrap> consumer);

    private void receiveMsg(String str, Consumer<MessageWrap> consumer, boolean z) {
        if (Cluster.haEnabled() && Cluster.getClusterHA().mq_findAllUnCommittedMsg(str).stream().anyMatch(prepareCommitMsg -> {
            logger.trace("[MQ] receive by HA {}:{}", str, prepareCommitMsg.getMsg());
            try {
                consumer.accept(prepareCommitMsg.getMsg());
                Cluster.getClusterHA().mq_afterMsgAcked(prepareCommitMsg.getMsgId());
                return false;
            } catch (Exception e) {
                logger.error("[MQ] receive by HA error.", e);
                return true;
            }
        })) {
            return;
        }
        Consumer<MessageWrap> consumer2 = messageWrap -> {
            logger.trace("[MQ] receive {}:{}", str, messageWrap);
            try {
                if (Cluster.haEnabled()) {
                    String mq_afterPollMsg = Cluster.getClusterHA().mq_afterPollMsg(str, messageWrap);
                    consumer.accept(messageWrap);
                    Cluster.getClusterHA().mq_afterMsgAcked(mq_afterPollMsg);
                } else {
                    consumer.accept(messageWrap);
                }
            } catch (Exception e) {
                throw new RuntimeException("[MQ] receive error:" + messageWrap, e);
            }
        };
        if (z) {
            doResponse(str, consumer2);
        } else {
            doSubscribe(str, consumer2);
        }
    }
}
