package io.github.howinfun.configuration;

import cn.hutool.core.util.RandomUtil;
import io.github.howinfun.client.CustomerPulsarClient;
import io.github.howinfun.client.MultiPulsarClient;
import io.github.howinfun.ececption.PulsarAutoConfigException;
import io.github.howinfun.listener.BaseMessageListener;
import io.github.howinfun.listener.PulsarListener;
import io.github.howinfun.listener.ThreadPool;
import io.github.howinfun.properties.MultiPulsarProperties;
import io.github.howinfun.utils.TopicUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:io/github/howinfun/configuration/PulsarConsumerAutoConfigure.class */
public class PulsarConsumerAutoConfigure implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(PulsarConsumerAutoConfigure.class);

    @Autowired(required = false)
    private List<BaseMessageListener> listeners;
    private final MultiPulsarClient multiPulsarClient;
    private final MultiPulsarProperties multiPulsarProperties;

    public PulsarConsumerAutoConfigure(MultiPulsarClient multiPulsarClient, MultiPulsarProperties multiPulsarProperties) {
        this.multiPulsarClient = multiPulsarClient;
        this.multiPulsarProperties = multiPulsarProperties;
    }

    public void run(String... strArr) throws Exception {
        if (CollectionUtils.isEmpty(this.listeners)) {
            log.warn("[Pulsar] 未发现有Consumer");
            return;
        }
        for (BaseMessageListener baseMessageListener : this.listeners) {
            PulsarListener pulsarListener = (PulsarListener) AnnotationUtils.findAnnotation(baseMessageListener.getClass(), PulsarListener.class);
            if (Objects.nonNull(pulsarListener)) {
                try {
                    String sourceName = pulsarListener.sourceName();
                    CustomerPulsarClient orDefault = this.multiPulsarClient.getOrDefault(sourceName, null);
                    if (Objects.isNull(orDefault)) {
                        log.error("[Pulsar] 数据源对应PulsarClient不存在，sourceName is {}", sourceName);
                    } else {
                        ConsumerBuilder receiverQueueSize = orDefault.getClient().newConsumer(Schema.STRING).receiverQueueSize(pulsarListener.receiverQueueSize());
                        if (pulsarListener.topics().length > 0) {
                            if (Boolean.TRUE.equals(baseMessageListener.enableAsync())) {
                                log.info("[Pulsar] 消费者开启异步消费，开始初始化消费线程池....");
                                ThreadPool threadPool = pulsarListener.threadPool();
                                baseMessageListener.initThreadPool(Integer.valueOf(threadPool.coreThreads()), Integer.valueOf(threadPool.maxCoreThreads()), Integer.valueOf(threadPool.keepAliveTime()), Integer.valueOf(threadPool.maxQueueLength()), threadPool.threadPoolName());
                            }
                            ArrayList arrayList = new ArrayList(pulsarListener.topics().length);
                            String tenantBySourceName = StringUtils.isBlank(pulsarListener.tenant()) ? this.multiPulsarProperties.getTenantBySourceName(sourceName) : pulsarListener.tenant();
                            String namespaceBySourceName = StringUtils.isBlank(pulsarListener.namespace()) ? this.multiPulsarProperties.getNamespaceBySourceName(sourceName) : pulsarListener.namespace();
                            if (StringUtils.isBlank(tenantBySourceName) || StringUtils.isBlank(namespaceBySourceName)) {
                                log.error("[Pulsar] 消费者初始化失败，subscriptionName is {},sourceName is {},tenant is {},namespace is {}", new Object[]{pulsarListener.subscriptionName(), sourceName, tenantBySourceName, namespaceBySourceName});
                            } else {
                                Boolean valueOf = Boolean.valueOf(pulsarListener.persistent());
                                for (String str : pulsarListener.topics()) {
                                    arrayList.add(TopicUtil.generateTopic(valueOf, tenantBySourceName, namespaceBySourceName, str));
                                }
                                receiverQueueSize.topics(arrayList);
                                receiverQueueSize.subscriptionName(StringUtils.isBlank(pulsarListener.subscriptionName()) ? "subscription_" + RandomUtil.randomString(3) : pulsarListener.subscriptionName());
                                receiverQueueSize.ackTimeout(Long.parseLong(pulsarListener.ackTimeout()), TimeUnit.MILLISECONDS);
                                receiverQueueSize.subscriptionType(pulsarListener.subscriptionType());
                                if (Boolean.TRUE.equals(Boolean.valueOf(pulsarListener.enableRetry()))) {
                                    DeadLetterPolicy build = DeadLetterPolicy.builder().maxRedeliverCount(pulsarListener.maxRedeliverCount()).build();
                                    if (StringUtils.isNotBlank(pulsarListener.retryLetterTopic())) {
                                        build.setRetryLetterTopic(pulsarListener.retryLetterTopic());
                                    }
                                    if (StringUtils.isNotBlank(pulsarListener.deadLetterTopic())) {
                                        build.setDeadLetterTopic(pulsarListener.deadLetterTopic());
                                    }
                                    receiverQueueSize.enableRetry(pulsarListener.enableRetry()).deadLetterPolicy(build);
                                } else if (StringUtils.isNotBlank(pulsarListener.deadLetterTopic())) {
                                    if (SubscriptionType.Exclusive.equals(pulsarListener.subscriptionType())) {
                                        throw new PulsarAutoConfigException("[Pulsar] 消费端仅支持在Shared/Key_Shared模式下单独使用死信队列");
                                    }
                                    receiverQueueSize.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(pulsarListener.maxRedeliverCount()).deadLetterTopic(pulsarListener.deadLetterTopic()).build());
                                }
                                receiverQueueSize.messageListener(baseMessageListener);
                                log.info("[Pulsar] Consumer初始化完毕, sourceName is {}, topic is {},", sourceName, receiverQueueSize.subscribe().getTopic());
                            }
                        }
                    }
                } catch (PulsarClientException e) {
                    throw new PulsarAutoConfigException("[Pulsar] consumer初始化异常", e);
                }
            }
        }
    }
}
