package info.xiancloud.rpc.netty.server;

import com.alibaba.fastjson.JSONObject;
import info.xiancloud.plugin.distribution.LocalNodeManager;
import info.xiancloud.plugin.distribution.MessageType;
import info.xiancloud.plugin.message.IdManager;
import info.xiancloud.plugin.message.UnitRequest;
import info.xiancloud.plugin.message.UnitResponse;
import info.xiancloud.plugin.message.sender.local.DefaultLocalAsyncSender;
import info.xiancloud.plugin.support.mq.mqtt.handle.NotifyHandler;
import info.xiancloud.plugin.support.mq.mqtt.mqtt_callback.sequencer.ISequencer;
import info.xiancloud.plugin.thread_pool.ThreadPoolManager;
import info.xiancloud.plugin.util.LOG;
import info.xiancloud.plugin.util.thread.MsgIdHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.function.Consumer;

/* loaded from: input_file:info/xiancloud/rpc/netty/server/RpcServerDefaultHandler.class */
public class RpcServerDefaultHandler extends SimpleChannelInboundHandler<JSONObject> {
    public void channelActive(final ChannelHandlerContext channelHandlerContext) throws Exception {
        LOG.info(new JSONObject() { // from class: info.xiancloud.rpc.netty.server.RpcServerDefaultHandler.1
            {
                put("myMsg", "rpc connected.");
                put("type", "serverSideChannelActive");
                put("server", channelHandlerContext.channel().localAddress());
                put("client", channelHandlerContext.channel().remoteAddress());
            }
        });
    }

    public void channelRead0(ChannelHandlerContext channelHandlerContext, JSONObject jSONObject) throws Exception {
        if (MessageType.isPing(jSONObject)) {
            LOG.info("Received a ping from client size.");
        } else {
            processMsg(channelHandlerContext, jSONObject);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LOG.error(th);
        channelHandlerContext.close();
    }

    private void processMsg(ChannelHandlerContext channelHandlerContext, JSONObject jSONObject) {
        try {
            IdManager.makeSureMsgId(jSONObject);
            if (MessageType.isDefaultRequest(jSONObject)) {
                UnitRequest unitRequest = (UnitRequest) jSONObject.toJavaObject(UnitRequest.class);
                unitRequest.getContext().setFromRemote(true);
                String group = unitRequest.getContext().getGroup();
                String unit = unitRequest.getContext().getUnit();
                final Consumer consumer = str -> {
                    LOG.info("rpc server --> client发送失败了，因此这里复用当前长连接响应消息");
                    channelHandlerContext.writeAndFlush(str + "\r\n$end!");
                };
                ISequencer.build(group, unit, jSONObject).sequence(() -> {
                    new DefaultLocalAsyncSender(unitRequest, new NotifyHandler() { // from class: info.xiancloud.rpc.netty.server.RpcServerDefaultHandler.3
                        protected void toContinue(UnitResponse unitResponse) {
                            LocalNodeManager.sendBack(unitResponse, consumer);
                        }
                    }).send();
                }, new NotifyHandler() { // from class: info.xiancloud.rpc.netty.server.RpcServerDefaultHandler.2
                    protected void toContinue(UnitResponse unitResponse) {
                        LocalNodeManager.sendBack(unitResponse, consumer);
                    }
                });
            } else if (MessageType.isDefaultResponse(jSONObject)) {
                LOG.debug("这是非常重要的说明：1、客户端发给外部节点的请求期待的响应内容，服务端节在准备好响应结果后立刻与请求端新建一个rpc长连接/复用已存在的长连接，将响应写回去；2、停服务时本地server会先停止，server停止就会关闭socket和server的io线程池，由于server的io线程池和业务线程池是分离的，业务线程池会继续运行直到所有任务处理完毕为止。此时，本节点不再有能力接收外部请求了，但是：a.在即将停止的节点内，业务线程池任然需要向外部发送请求以完成业务操作，以及得到响应结果，因此client必须保持打开的。b.在即将停止的节点内，业务线程池需要将本地执行结果返回给远程，这时候server已停，无法复用原管道将结果写回去，因此必须使用依然存活的client将结果写回。因此，有如下逻辑：所有server优先复用当前管道将响应写回去，当SERVER关闭后，业务线程池中后续任务通过未停止的client回写响应结果。");
                UnitResponse unitResponse = (UnitResponse) jSONObject.toJavaObject(UnitResponse.class);
                String ssid = unitResponse.getContext().getSsid();
                ThreadPoolManager.execute(() -> {
                    NotifyHandler notifyHandler = (NotifyHandler) LocalNodeManager.handleMap.getIfPresent(ssid);
                    if (notifyHandler == null) {
                        LOG.error(String.format("ssid=%s的消息没有找到对应的notifyHandler!整个消息内容=%s,", ssid, jSONObject), new Throwable());
                    } else {
                        LocalNodeManager.handleMap.invalidate(ssid);
                        notifyHandler.callback(UnitResponse.create(jSONObject));
                    }
                }, unitResponse.getContext().getMsgId());
            } else {
                LOG.error("rpc server端只支持request和response两种消息类型，不支持:" + jSONObject.getString("$msgType"), new RuntimeException());
            }
        } catch (Throwable th) {
            LOG.error(th);
        } finally {
            MsgIdHolder.clear();
        }
    }

    private void logRpcFly(JSONObject jSONObject, final String str, final ChannelHandlerContext channelHandlerContext) {
        try {
            final String string = jSONObject.getString("$ssid");
            final String string2 = jSONObject.getString("$LOCAL_NODE_ID");
            final String string3 = jSONObject.getString("$msgType");
            if (string != null) {
                final long longValue = ((Long) jSONObject.remove("$timestampMs")).longValue();
                LOG.debug(new JSONObject() { // from class: info.xiancloud.rpc.netty.server.RpcServerDefaultHandler.4
                    {
                        put("cost", Long.valueOf(System.currentTimeMillis() - longValue));
                        put("type", "rpcFly");
                        put("ssid", string);
                        put("from", string2);
                        put("length", Integer.valueOf(str.length()));
                        put("payload", str);
                        put("msgType", string3);
                        put("client", channelHandlerContext.channel().remoteAddress());
                        put("server", channelHandlerContext.channel().localAddress());
                    }
                });
            }
        } catch (Throwable th) {
            LOG.error(th);
        }
    }
}
