package net.jlxxw.wechat.component.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ServerChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/jlxxw/wechat/component/netty/MetricsHandler.class */
public class MetricsHandler extends ChannelDuplexHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MetricsHandler.class);
    private static final AtomicBoolean START_TASK = new AtomicBoolean(false);
    private static final AtomicLong CHANNEL_COUNT = new AtomicLong(0);
    private static final AtomicLong TOTAL_READ_BYTES = new AtomicLong(0);
    private static final AtomicLong TOTAL_WRITE_BYTES = new AtomicLong(0);
    private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private static final ScheduledExecutorService statService = new ScheduledThreadPoolExecutor(1, runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("netty-metrics-thread");
        thread.setDaemon(false);
        return thread;
    });

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        CHANNEL_COUNT.incrementAndGet();
        if (START_TASK.compareAndSet(false, true)) {
            statService.scheduleAtFixedRate(() -> {
                LOG.info("----------------性能指标采集开始-------------------");
                LOG.info("目前在线Channel数：" + CHANNEL_COUNT.get());
                Iterator it = channelHandlerContext.executor().parent().iterator();
                while (it.hasNext()) {
                    EventExecutor eventExecutor = (SingleThreadEventExecutor) ((EventExecutor) it.next());
                    int pendingTasks = eventExecutor.pendingTasks();
                    if (eventExecutor == channelHandlerContext.executor()) {
                        LOG.info(channelHandlerContext.channel() + ":" + eventExecutor + "待处理队列大小 :  " + pendingTasks);
                    } else {
                        LOG.info(eventExecutor + " 待处理队列大小 : " + pendingTasks);
                    }
                }
                for (Channel channel : CHANNEL_GROUP) {
                    if (!(channel instanceof ServerChannel)) {
                        LOG.info(channel + "发送缓存积压字节数：" + channel.unsafe().outboundBuffer().totalPendingWriteBytes());
                    }
                }
                LOG.info("读取速率(字节/秒)：" + TOTAL_READ_BYTES.getAndSet(0L));
                LOG.info("写出速率(字节/秒)：" + TOTAL_WRITE_BYTES.getAndSet(0L));
                LOG.info("----------------性能指标采集结束-------------------");
            }, 0L, 1L, TimeUnit.SECONDS);
        }
        CHANNEL_GROUP.add(channelHandlerContext.channel());
        super.channelActive(channelHandlerContext);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        TOTAL_READ_BYTES.getAndAdd(((ByteBuf) obj).readableBytes());
        channelHandlerContext.fireChannelRead(obj);
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        TOTAL_WRITE_BYTES.getAndAdd(((ByteBuf) obj).readableBytes());
        super.write(channelHandlerContext, obj, channelPromise);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        CHANNEL_COUNT.decrementAndGet();
        CHANNEL_GROUP.remove(channelHandlerContext.channel());
        super.channelInactive(channelHandlerContext);
    }
}
