package info.xiancloud.core.distribution;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import info.xiancloud.core.Constant;
import info.xiancloud.core.Group;
import info.xiancloud.core.LocalUnitsManager;
import info.xiancloud.core.NotifyHandler;
import info.xiancloud.core.message.IdManager;
import info.xiancloud.core.message.RequestContext;
import info.xiancloud.core.message.UnitRequest;
import info.xiancloud.core.message.UnitResponse;
import info.xiancloud.core.message.id.NodeIdBean;
import info.xiancloud.core.message.sender.remote.msg_publisher.IMsgPublisher;
import info.xiancloud.core.stream.StreamManager;
import info.xiancloud.core.stream.StreamSerializer;
import info.xiancloud.core.thread_pool.ThreadPoolManager;
import info.xiancloud.core.util.EnvUtil;
import info.xiancloud.core.util.LOG;
import info.xiancloud.core.util.thread.ThreadUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/* loaded from: input_file:info/xiancloud/core/distribution/Node.class */
public class Node implements INode {
    private final String nodeId;
    private final String application;
    public static int RPC_PORT;
    private final IMsgPublisher publisher = IMsgPublisher.defaultPublisher;
    private final Date initDate = new Date();

    /* JADX INFO: Access modifiers changed from: protected */
    public Node(String str) {
        this.nodeId = str;
        this.application = NodeIdBean.parse(str).getApplication();
    }

    @Override // info.xiancloud.core.init.Initable
    public void init() {
        LOG.debug("nothing to do.");
    }

    public String getNodeId() {
        return this.nodeId;
    }

    @Override // info.xiancloud.core.distribution.INode
    public void send(UnitRequest unitRequest, NotifyHandler notifyHandler) {
        String nextSsid = IdManager.nextSsid();
        unitRequest.getContext().setSsid(nextSsid);
        fillRequestContext(unitRequest.getContext());
        String jSONStringWithDateFormat = JSON.toJSONStringWithDateFormat(unitRequest, "yyyy-MM-dd HH:mm:ss", new SerializerFeature[0]);
        LocalNodeManager.handleMap.put(nextSsid, notifyHandler);
        this.publisher.p2pPublish(unitRequest.getContext().getDestinationNodeId(), jSONStringWithDateFormat);
    }

    @Override // info.xiancloud.core.distribution.INode
    public UnitResponse send(UnitRequest unitRequest) {
        String nextSsid = IdManager.nextSsid();
        unitRequest.getContext().setSsid(nextSsid);
        fillRequestContext(unitRequest.getContext());
        UnitResponse create = UnitResponse.create(true);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        NotifyHandler notifyHandler = new NotifyHandler() { // from class: info.xiancloud.core.distribution.Node.1
            @Override // info.xiancloud.core.NotifyHandler
            public void handle(UnitResponse unitResponse) {
                UnitResponse.copy(unitResponse, unitResponse);
                countDownLatch.countDown();
            }
        };
        LocalNodeManager.handleMap.put(nextSsid, notifyHandler);
        this.publisher.p2pPublish(unitRequest.getContext().getDestinationNodeId(), JSON.toJSONStringWithDateFormat(unitRequest, "yyyy-MM-dd HH:mm:ss", new SerializerFeature[0]));
        try {
            if (countDownLatch.await(Constant.UNIT_DEFAULT_TIME_OUT_IN_MILLI, TimeUnit.MILLISECONDS)) {
                return create;
            }
            notifyHandler.setTimeout(true);
            return UnitResponse.error(Group.CODE_TIME_OUT, null, "Response time out!").setContext(UnitResponse.Context.create().setSsid(nextSsid));
        } catch (InterruptedException e) {
            return UnitResponse.exception(e);
        }
    }

    @Override // info.xiancloud.core.distribution.INode
    public void sendBack(final UnitResponse unitResponse, Consumer<String> consumer) {
        InputStream inputStream = null;
        if (unitResponse.getData() != null && (unitResponse.getData() instanceof File)) {
            try {
                inputStream = new FileInputStream((File) unitResponse.dataToType(File.class));
            } catch (FileNotFoundException e) {
                throw new RuntimeException("文件 " + ((File) unitResponse.dataToType(File.class)).getName() + " 不存在", e);
            }
        } else if (unitResponse.getData() != null && (unitResponse.getData() instanceof InputStream)) {
            inputStream = (InputStream) unitResponse.dataToType(InputStream.class);
        }
        if (inputStream != null) {
            StreamSerializer.singleton.encodeAndApply(inputStream, unitResponse.getContext().getSsid(), unitResponse.getMsgId(), streamFragmentBean -> {
                this.publisher.p2pPublish(unitResponse.getContext().getDestinationNodeId(), JSON.toJSONString(streamFragmentBean));
                JSONObject jSONObject = new JSONObject();
                jSONObject.put("type", "stream");
                jSONObject.put("detail", "sent ---------------->    " + (((streamFragmentBean.getHeader().getIndex() + 1) * StreamManager.BUF_SIZE_IN_BYTE) / 1024.0d) + " kb");
                jSONObject.put("header", streamFragmentBean.getHeader());
                LOG.info(jSONObject);
            });
            return;
        }
        fillResponseContext(unitResponse.getContext());
        String jSONStringWithDateFormat = JSON.toJSONStringWithDateFormat(unitResponse, "yyyy-MM-dd HH:mm:ss", new SerializerFeature[0]);
        if (this.publisher.p2pPublish(unitResponse.getContext().getDestinationNodeId(), jSONStringWithDateFormat)) {
            return;
        }
        LOG.info("反向发送响应结果失败，因此做回调处理");
        if (consumer != null) {
            consumer.accept(jSONStringWithDateFormat);
        } else {
            LOG.error(new JSONObject() { // from class: info.xiancloud.core.distribution.Node.2
                {
                    put("type", "sendBackFailure");
                    put("description", "消息回送失败！！！");
                    put("ssid", unitResponse.getContext().getSsid());
                    put("destinationNodeId", unitResponse.getContext().getDestinationNodeId());
                }
            });
        }
    }

    @Override // info.xiancloud.core.distribution.INode
    public void sendBack(UnitResponse unitResponse) {
        sendBack(unitResponse, null);
    }

    private void fillRequestContext(RequestContext requestContext) {
        IdManager.makeSureMsgId(requestContext);
        requestContext.setNodeStatus(getSimpleStatus());
        requestContext.setMessageType(MessageType.request);
    }

    private void fillResponseContext(UnitResponse.Context context) {
        IdManager.makeSureMsgId(context);
        context.setNodeStatus(getSimpleStatus());
        context.setMessageType(MessageType.response);
    }

    @Override // info.xiancloud.core.distribution.INode
    public NodeStatus getFullStatus() {
        NodeStatus simpleStatus = getSimpleStatus();
        LocalUnitsManager.searchUnitMap(map -> {
            simpleStatus.setUnits(map.keySet());
        });
        simpleStatus.setPort(RPC_PORT);
        simpleStatus.setHost(EnvUtil.getLocalIp());
        return simpleStatus;
    }

    @Override // info.xiancloud.core.distribution.INode
    public NodeStatus getSimpleStatus() {
        return NodeStatus.create().setNodeId(this.nodeId).setQueueSize(ThreadPoolManager.queueSize()).setActiveCount(ThreadPoolManager.activeCount()).setCpuCores(ThreadUtils.CPU_CORES.intValue()).setInitTime(this.initDate.getTime());
    }

    @Override // info.xiancloud.core.init.Destroyable
    public void destroy() {
    }

    public String application() {
        return this.application;
    }

    static {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                serverSocket.setReuseAddress(true);
                RPC_PORT = serverSocket.getLocalPort();
                if (serverSocket != null) {
                    if (0 != 0) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serverSocket.close();
                    }
                }
            } finally {
            }
        } catch (Throwable th3) {
            LOG.error(th3);
            System.exit(-100);
        }
    }
}
