package info.xiancloud.rpc.netty.client;

import com.alibaba.fastjson.JSONObject;
import info.xiancloud.plugin.distribution.exception.ApplicationInstanceOfflineException;
import info.xiancloud.plugin.distribution.loadbalance.ApplicationRouter;
import info.xiancloud.plugin.distribution.service_discovery.ApplicationInstance;
import info.xiancloud.plugin.rpc.RpcClient;
import info.xiancloud.plugin.util.EnvUtil;
import info.xiancloud.plugin.util.LOG;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:info/xiancloud/rpc/netty/client/RpcNettyClient.class */
public final class RpcNettyClient implements RpcClient {
    private static final boolean SSL;
    private static final Map<String, Channel> nodeId_to_connectedChannel_map;
    private static final Lock lock;

    public boolean request(String str, String str2) {
        if (!channelAvailable(str)) {
            try {
                lazyInit(str);
            } catch (Exception e) {
                LOG.error("Error, unexpected exception.", e);
                return false;
            } catch (ApplicationInstanceOfflineException e2) {
                LOG.warn("Warning, node is offline.", e2);
                return false;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format(">>>> rpc to %s  >>> remoteAddress=%s", str, nodeId_to_connectedChannel_map.get(str).remoteAddress()));
        }
        nodeId_to_connectedChannel_map.get(str).writeAndFlush(str2 + "\r\n$end!");
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void removeClosedChannel(String str) {
        if (!lock.tryLock()) {
            LOG.info(String.format("RpcClient删除缓存的与%s的废弃的连接：已经有新连接正在建立，那么这里不做操作，由新建连接的线程去覆盖掉缓存中的废弃连接", str));
            return;
        }
        try {
            nodeId_to_connectedChannel_map.remove(str);
            LOG.info(String.format("RpcClient:与%s的空闲长连接缓存释放完毕...", str));
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private static boolean channelAvailable(String str) {
        if (!(nodeId_to_connectedChannel_map.get(str) != null)) {
            return false;
        }
        boolean isOpen = nodeId_to_connectedChannel_map.get(str).isOpen();
        boolean isActive = nodeId_to_connectedChannel_map.get(str).isActive();
        boolean isWritable = nodeId_to_connectedChannel_map.get(str).isWritable();
        boolean z = isOpen && isActive;
        if (!z) {
            LOG.warn(new Throwable(String.format("RpcClient: 现存rpc channel不可用，请检查具体原因。channelStatus exists=%s open=%s active=%s writable=%s", true, Boolean.valueOf(isOpen), Boolean.valueOf(isActive), Boolean.valueOf(isWritable))));
        }
        if (!isWritable) {
            LOG.warn(new Throwable(String.format("RpcClient: 现存rpc channel 写缓冲区占用空间超过设定的水位值，请检查具体原因。channelStatus exists=%s open=%s active=%s writable=%s", true, Boolean.valueOf(isOpen), Boolean.valueOf(isActive), Boolean.valueOf(isWritable))));
        }
        return z;
    }

    /* JADX WARN: Type inference failed for: r0v44, types: [info.xiancloud.rpc.netty.client.RpcNettyClient$1] */
    private static void lazyInit(final String str) throws Exception {
        lock.lock();
        try {
            try {
                if (channelAvailable(str)) {
                    LOG.debug(String.format("RpcClient:已经存在一个与%s的长连接，不再新建连接.", str));
                    lock.unlock();
                    return;
                }
                LOG.info(String.format("RpcClient:开始新建与%s的长连接...", str));
                ApplicationInstance applicationRouter = ApplicationRouter.singleton.getInstance(str);
                String address = Objects.equals(applicationRouter.getAddress(), EnvUtil.getLocalIp()) ? "127.0.0.1" : applicationRouter.getAddress();
                int intValue = applicationRouter.getPort().intValue();
                SslContext build = SSL ? SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null;
                NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(nioEventLoopGroup).option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(10485760, 20971520)).channel(NioSocketChannel.class).handler(new RpcNettyClientInitializer(build, str)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
                final Channel channel = bootstrap.connect(address, intValue).sync().channel();
                channel.closeFuture().addListener(future -> {
                    nioEventLoopGroup.shutdownGracefully();
                    LOG.info("The EventLoopGroup has been terminated completely and all Channels that belong to the group have been closed.");
                });
                nodeId_to_connectedChannel_map.put(str, channel);
                LOG.info(new JSONObject() { // from class: info.xiancloud.rpc.netty.client.RpcNettyClient.1
                    {
                        put("toNodeId", str);
                        put("rpcRemoteAddress", channel.remoteAddress().toString());
                        put("type", "rpcChannelConnected");
                        put("description", String.format("RpcClient:与%s的长连接建立完毕, remoteAddress=%s", str, channel.remoteAddress()));
                    }
                }.toJSONString());
                lock.unlock();
            } catch (Throwable th) {
                throw new Exception(String.format("与远程节点%s建立长连接失败:host=%s,port=%s", str, null, -1), th);
            }
        } catch (Throwable th2) {
            lock.unlock();
            throw th2;
        }
    }

    public void destroy() {
        Iterator<Channel> it = nodeId_to_connectedChannel_map.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        nodeId_to_connectedChannel_map.clear();
    }

    static {
        SSL = System.getProperty("XIAN_RPC_SSL") != null;
        nodeId_to_connectedChannel_map = new ConcurrentHashMap();
        lock = new ReentrantLock();
    }
}
