package io.github.howinfun.listener;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jodd.util.concurrent.ThreadFactoryBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/howinfun/listener/BaseMessageListener.class */
public abstract class BaseMessageListener implements MessageListener<String> {
    private static final Logger log = LoggerFactory.getLogger(BaseMessageListener.class);
    private Executor executor;

    public void initThreadPool(Integer num, Integer num2, Integer num3, Integer num4, String str) {
        if (Objects.isNull(this.executor) && Boolean.TRUE.equals(enableAsync())) {
            this.executor = new ThreadPoolExecutor(num.intValue(), num2.intValue(), num3.intValue(), TimeUnit.MINUTES, new LinkedBlockingDeque(num4.intValue()), new ThreadFactoryBuilder().setNameFormat(str + "-%d").get(), new ThreadPoolExecutor.CallerRunsPolicy());
            log.info("[Pulsar] Consumer消费线程池初始化成功！");
        }
    }

    public void received(Consumer<String> consumer, Message<String> message) {
        if (Objects.nonNull(this.executor) && Boolean.TRUE.equals(enableAsync())) {
            this.executor.execute(() -> {
                doReceived(consumer, message);
            });
        } else {
            doReceived(consumer, message);
        }
    }

    protected abstract void doReceived(Consumer<String> consumer, Message<String> message);

    public Boolean enableAsync() {
        return Boolean.TRUE;
    }
}
