package xyz.hellothomas.jedi.core.internals.message.http;

import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.hellothomas.jedi.core.dto.collector.AbstractNotification;
import xyz.hellothomas.jedi.core.enums.HttpNotificationPath;
import xyz.hellothomas.jedi.core.enums.MessageType;
import xyz.hellothomas.jedi.core.internals.message.AbstractNotificationService;
import xyz.hellothomas.jedi.core.utils.JediRemotingUtil;

/* loaded from: input_file:xyz/hellothomas/jedi/core/internals/message/http/HttpNotificationService.class */
public class HttpNotificationService extends AbstractNotificationService {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpNotificationService.class);
    private String messageServerUrl;
    private LinkedBlockingQueue<AbstractNotification> executorTickerNotificationQueue;
    private LinkedBlockingQueue<AbstractNotification> executorTaskNotificationQueue;
    private LinkedBlockingQueue<AbstractNotification> defaultNotificationQueue;
    private volatile boolean toStopTicker;
    private volatile boolean toStopTask;
    private volatile boolean toStopDefault;

    public HttpNotificationService(String str, String str2, String str3) {
        super(str2, str3);
        this.messageServerUrl = str;
        this.executorTickerNotificationQueue = new LinkedBlockingQueue<>(1000);
        this.executorTaskNotificationQueue = new LinkedBlockingQueue<>(5000);
        this.defaultNotificationQueue = new LinkedBlockingQueue<>(1000);
        startTickerMessageSendThread();
        startTaskMessageSendThread();
        startDefaultMessageSendThread();
    }

    @Override // xyz.hellothomas.jedi.core.internals.message.AbstractNotificationService
    public void pushNotification(AbstractNotification abstractNotification) {
        try {
            if (MessageType.EXECUTOR_TASK.getTypeValue().equals(abstractNotification.getMessageType())) {
                this.executorTaskNotificationQueue.add(abstractNotification);
            } else if (MessageType.EXECUTOR_TICKER.getTypeValue().equals(abstractNotification.getMessageType())) {
                this.executorTickerNotificationQueue.add(abstractNotification);
            } else if (MessageType.EXECUTOR_SHUTDOWN.getTypeValue().equals(abstractNotification.getMessageType())) {
                send(abstractNotification, MessageType.EXECUTOR_SHUTDOWN);
            } else {
                this.defaultNotificationQueue.add(abstractNotification);
            }
        } catch (Exception e) {
            LOGGER.warn("消息{} push异常: {}", abstractNotification, e);
        }
    }

    @Override // xyz.hellothomas.jedi.core.internals.message.AbstractNotificationService
    public void send(Object obj, MessageType messageType) {
        String str = this.messageServerUrl + HttpNotificationPath.getPathByMessageType(messageType);
        LOGGER.trace("send to {}, message: {}", str, obj);
        LOGGER.trace("result: {}", JediRemotingUtil.postBody(str, null, 3, obj, String.class));
    }

    private void startTickerMessageSendThread() {
        Thread thread = new Thread(() -> {
            ArrayList arrayList = new ArrayList(10);
            while (!this.toStopTicker) {
                try {
                    LOGGER.trace("获取打点消息发送队列中...");
                    arrayList.add(this.executorTickerNotificationQueue.take());
                    this.executorTickerNotificationQueue.drainTo(arrayList, 9);
                    send(arrayList, MessageType.EXECUTOR_TICKER);
                } catch (Exception e) {
                    if (!this.toStopTicker) {
                        LOGGER.warn("打点消息发送任务失败, 异常为: {}, 消息为: {}! ", e, arrayList);
                    }
                } finally {
                    arrayList.clear();
                }
            }
        });
        thread.setDaemon(true);
        thread.setName("打点消息发送线程");
        thread.start();
        LOGGER.debug("{}已启动", thread.getName());
    }

    private void startTaskMessageSendThread() {
        Thread thread = new Thread(() -> {
            ArrayList arrayList = new ArrayList(10);
            while (!this.toStopTask) {
                try {
                    LOGGER.trace("获取任务消息发送队列中...");
                    arrayList.add(this.executorTaskNotificationQueue.take());
                    this.executorTaskNotificationQueue.drainTo(arrayList, 9);
                    send(arrayList, MessageType.EXECUTOR_TASK);
                } catch (Exception e) {
                    if (!this.toStopTask) {
                        LOGGER.warn("任务消息发送任务失败, 异常为: {}, 消息为: {}! ", e, arrayList);
                    }
                } finally {
                    arrayList.clear();
                }
            }
        });
        thread.setDaemon(true);
        thread.setName("任务消息发送线程");
        thread.start();
        LOGGER.debug("{}已启动", thread.getName());
    }

    private void startDefaultMessageSendThread() {
        Thread thread = new Thread(() -> {
            ArrayList arrayList = new ArrayList(10);
            while (!this.toStopDefault) {
                try {
                    LOGGER.trace("获取默认消息发送队列中...");
                    arrayList.add(this.defaultNotificationQueue.take());
                    this.defaultNotificationQueue.drainTo(arrayList, 9);
                    send(arrayList, MessageType.CUSTOM_NOTIFICATION);
                } catch (Exception e) {
                    if (!this.toStopDefault) {
                        LOGGER.warn("默认消息发送任务失败, 异常为: {}, 消息为: {}! ", e, arrayList);
                    }
                } finally {
                    arrayList.clear();
                }
            }
        });
        thread.setDaemon(true);
        thread.setName("默认消息发送线程");
        thread.start();
        LOGGER.debug("{}已启动", thread.getName());
    }
}
