package info.xiancloud.core.message.sender.broadcast;

import info.xiancloud.core.NotifyHandler;
import info.xiancloud.core.Unit;
import info.xiancloud.core.UnitMeta;
import info.xiancloud.core.distribution.LocalNodeManager;
import info.xiancloud.core.distribution.exception.UnitOfflineException;
import info.xiancloud.core.distribution.exception.UnitUndefinedException;
import info.xiancloud.core.distribution.loadbalance.UnitRouter;
import info.xiancloud.core.distribution.service_discovery.UnitInstance;
import info.xiancloud.core.message.UnitRequest;
import info.xiancloud.core.message.UnitResponse;
import info.xiancloud.core.message.sender.AbstractAsyncSender;
import info.xiancloud.core.message.sender.local.RoutedLocalAsyncSender;
import info.xiancloud.core.util.CloneUtil;
import info.xiancloud.core.util.LOG;
import info.xiancloud.core.util.thread.ListenableCountDownLatch;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/* loaded from: input_file:info/xiancloud/core/message/sender/broadcast/BroadcastSender.class */
public class BroadcastSender extends AbstractAsyncSender {
    public BroadcastSender(UnitRequest unitRequest, NotifyHandler notifyHandler) {
        super(unitRequest, notifyHandler);
    }

    @Override // info.xiancloud.core.message.sender.AbstractAsyncSender
    protected void asyncSend() throws UnitOfflineException, UnitUndefinedException {
        List<UnitInstance> allInstances = UnitRouter.singleton.allInstances(Unit.fullName(this.unitRequest.getContext().getGroup(), this.unitRequest.getContext().getUnit()));
        final UnitMeta.Broadcast broadcast = allInstances.get(0).getPayload().getMeta().getBroadcast();
        final ListenableCountDownLatch listenableCountDownLatch = new ListenableCountDownLatch(allInstances.size());
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        listenableCountDownLatch.addListener(l -> {
            if (l.longValue() == 0) {
                this.callback.callback(UnitResponse.createSuccess(concurrentLinkedQueue));
            } else {
                LOG.debug("Not all responses are sent back.");
            }
        });
        NotifyHandler notifyHandler = new NotifyHandler() { // from class: info.xiancloud.core.message.sender.broadcast.BroadcastSender.1
            @Override // info.xiancloud.core.NotifyHandler
            protected void handle(UnitResponse unitResponse) {
                if (!broadcast.isSuccessDataOnly()) {
                    concurrentLinkedQueue.add(unitResponse);
                } else if (unitResponse.succeeded()) {
                    concurrentLinkedQueue.add(unitResponse.getData());
                } else {
                    LOG.debug("Failed response is ignored here.");
                }
                listenableCountDownLatch.countDown();
            }
        };
        if (allInstances.size() > 100) {
            LOG.warn(String.format("Too many unit instances of %s.%s found! Take care on the performance!", this.unitRequest.getContext().getGroup(), this.unitRequest.getContext().getUnit()));
        }
        for (UnitInstance unitInstance : allInstances) {
            if (unitInstance.getNodeId().equals(LocalNodeManager.LOCAL_NODE_ID)) {
                new RoutedLocalAsyncSender((UnitRequest) CloneUtil.cloneBean(this.unitRequest, UnitRequest.class), notifyHandler).send();
            } else {
                LOG.debug("In order not to share the same unit request object，we clone a new request");
                UnitRequest unitRequest = (UnitRequest) CloneUtil.cloneBean(this.unitRequest, UnitRequest.class);
                unitRequest.getContext().setDestinationNodeId(unitInstance.getNodeId());
                LocalNodeManager.send(unitRequest, notifyHandler);
            }
        }
    }
}
