package info.xiancloud.plugin.message.sender.broadcast;

import info.xiancloud.plugin.Group;
import info.xiancloud.plugin.Unit;
import info.xiancloud.plugin.UnitMeta;
import info.xiancloud.plugin.distribution.LocalNodeManager;
import info.xiancloud.plugin.distribution.exception.UnitOfflineException;
import info.xiancloud.plugin.distribution.exception.UnitUndefinedException;
import info.xiancloud.plugin.distribution.loadbalance.UnitRouter;
import info.xiancloud.plugin.distribution.service_discovery.UnitInstance;
import info.xiancloud.plugin.message.UnitRequest;
import info.xiancloud.plugin.message.UnitResponse;
import info.xiancloud.plugin.message.sender.AbstractAsyncSender;
import info.xiancloud.plugin.message.sender.local.RoutedLocalAsyncSender;
import info.xiancloud.plugin.support.mq.mqtt.handle.NotifyHandler;
import info.xiancloud.plugin.util.CloneUtil;
import info.xiancloud.plugin.util.LOG;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:info/xiancloud/plugin/message/sender/broadcast/BroadcastSender.class */
public class BroadcastSender extends AbstractAsyncSender {
    public BroadcastSender(UnitRequest unitRequest, NotifyHandler notifyHandler) {
        super(unitRequest, notifyHandler);
    }

    @Override // info.xiancloud.plugin.message.sender.AbstractAsyncSender
    protected void asyncSend() throws UnitOfflineException, UnitUndefinedException {
        List<UnitInstance> allInstances = UnitRouter.singleton.allInstances(Unit.fullName(this.unitRequest.getContext().getGroup(), this.unitRequest.getContext().getUnit()));
        final UnitMeta.Broadcast broadcast = allInstances.get(0).getPayload().getMeta().getBroadcast();
        final CountDownLatch countDownLatch = new CountDownLatch(allInstances.size());
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        NotifyHandler notifyHandler = new NotifyHandler() { // from class: info.xiancloud.plugin.message.sender.broadcast.BroadcastSender.1
            @Override // info.xiancloud.plugin.support.mq.mqtt.handle.NotifyHandler
            protected void handle(UnitResponse unitResponse) {
                if (!broadcast.isSuccessDataOnly()) {
                    concurrentLinkedQueue.add(unitResponse);
                } else if (unitResponse.succeeded()) {
                    concurrentLinkedQueue.add(unitResponse.getData());
                }
                countDownLatch.countDown();
            }
        };
        for (UnitInstance unitInstance : allInstances) {
            if (unitInstance.getNodeId().equals(LocalNodeManager.LOCAL_NODE_ID)) {
                new RoutedLocalAsyncSender((UnitRequest) CloneUtil.cloneBean(this.unitRequest, UnitRequest.class), notifyHandler).send();
            } else {
                LOG.debug("In order not to share the same unit request object，we clone a new request");
                UnitRequest unitRequest = (UnitRequest) CloneUtil.cloneBean(this.unitRequest, UnitRequest.class);
                unitRequest.getContext().setDestinationNodeId(unitInstance.getNodeId());
                LocalNodeManager.send(unitRequest, notifyHandler);
            }
        }
        if (broadcast.isAsync()) {
            this.callback.callback(UnitResponse.success());
            return;
        }
        try {
            if (countDownLatch.await(broadcast.getTimeoutInMilli(), TimeUnit.MILLISECONDS)) {
                this.callback.callback(UnitResponse.success(concurrentLinkedQueue));
            } else {
                LOG.error((Throwable) new TimeoutException());
                this.callback.callback(UnitResponse.error(Group.CODE_TIME_OUT, concurrentLinkedQueue, "Time out while waiting for all the units to response, the data is only part of the result. "));
            }
        } catch (InterruptedException e) {
            this.callback.callback(UnitResponse.exception(e));
        }
    }
}
