package info.xiancloud.rpc.netty.client;

import com.alibaba.fastjson.JSONObject;
import info.xiancloud.plugin.distribution.LocalNodeManager;
import info.xiancloud.plugin.distribution.MessageType;
import info.xiancloud.plugin.message.UnitResponse;
import info.xiancloud.plugin.stream.Stream;
import info.xiancloud.plugin.stream.StreamFragmentBean;
import info.xiancloud.plugin.stream.StreamManager;
import info.xiancloud.plugin.support.mq.mqtt.handle.NotifyHandler;
import info.xiancloud.plugin.thread_pool.ThreadPoolManager;
import info.xiancloud.plugin.util.LOG;
import info.xiancloud.plugin.util.thread.MsgIdHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.RejectedExecutionException;

/* loaded from: input_file:info/xiancloud/rpc/netty/client/StreamRpcClientHandler.class */
public class StreamRpcClientHandler extends SimpleChannelInboundHandler<JSONObject> {
    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, JSONObject jSONObject) throws Exception {
        LOG.warn("已关闭此功能");
        if (!MessageType.isStream(jSONObject)) {
            channelHandlerContext.fireChannelRead(jSONObject);
            return;
        }
        try {
            StreamFragmentBean streamFragmentBean = (StreamFragmentBean) jSONObject.toJavaObject(StreamFragmentBean.class);
            MsgIdHolder.set(streamFragmentBean.getHeader().getMsgId());
            String id = streamFragmentBean.getHeader().getId();
            NotifyHandler notifyHandler = (NotifyHandler) LocalNodeManager.handleMap.getIfPresent(id);
            LocalNodeManager.handleMap.invalidate(id);
            Stream add = StreamManager.singleton.add(streamFragmentBean);
            if (streamFragmentBean.getHeader().isFirst()) {
                UnitResponse success = UnitResponse.success(add);
                try {
                    ThreadPoolManager.execute(() -> {
                        notifyHandler.callback(success);
                    });
                } catch (RejectedExecutionException e) {
                    LOG.info("线程池已关闭，这里使用临时线程执行任务，针对停服务时线程池已关闭的情况。");
                    new Thread(() -> {
                        notifyHandler.callback(success);
                    }).start();
                }
            }
        } catch (Throwable th) {
            LOG.error(th);
        } finally {
            MsgIdHolder.clear();
        }
    }
}
