package io.github.howinfun.template;

import io.github.howinfun.client.CustomerPulsarClient;
import io.github.howinfun.client.MultiPulsarClient;
import io.github.howinfun.ececption.PulsarBusinessException;
import io.github.howinfun.properties.MultiPulsarProperties;
import io.github.howinfun.utils.TopicUtil;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/howinfun/template/PulsarTemplate.class */
public class PulsarTemplate<T> {
    private static final Logger log = LoggerFactory.getLogger(PulsarTemplate.class);
    private final ConcurrentHashMap<String, Producer<T>> producerCaches = new ConcurrentHashMap<>(64);
    private final MultiPulsarClient multiPulsarClient;
    private final MultiPulsarProperties multiPulsarProperties;

    /* loaded from: input_file:io/github/howinfun/template/PulsarTemplate$Builder.class */
    public class Builder {
        private String sourceName;
        private Boolean persistent;
        private String tenant;
        private String namespace;
        private String topic;
        private Boolean blockIfQueueFull = true;

        public Builder() {
        }

        public PulsarTemplate<T>.Builder sourceName(String str) {
            this.sourceName = str;
            return this;
        }

        public PulsarTemplate<T>.Builder persistent(Boolean bool) {
            this.persistent = bool;
            return this;
        }

        public PulsarTemplate<T>.Builder tenant(String str) {
            this.tenant = str;
            return this;
        }

        public PulsarTemplate<T>.Builder namespace(String str) {
            this.namespace = str;
            return this;
        }

        public PulsarTemplate<T>.Builder topic(String str) {
            this.topic = str;
            return this;
        }

        public PulsarTemplate<T>.Builder blockIfQueueFull(Boolean bool) {
            this.blockIfQueueFull = bool;
            return this;
        }

        public MessageId send(String str) throws Exception {
            try {
                MessageId messageId = sendAsync(str).get();
                PulsarTemplate.log.info("[Pulsar] Producer同步发送消息成功，msg is {}", str);
                return messageId;
            } catch (InterruptedException | ExecutionException e) {
                PulsarTemplate.log.error("[Pulsar] Producer同步发送消息失败，msg is {}", str);
                throw e;
            }
        }

        public <T> CompletableFuture<MessageId> sendAsync(T t) throws PulsarClientException {
            String generateTopic = generateTopic();
            String str = StringUtils.isNotBlank(this.sourceName) ? this.sourceName : MultiPulsarProperties.DEFAULT_SOURCE_NAME;
            try {
                Producer producer = (Producer) PulsarTemplate.this.producerCaches.getOrDefault(generateTopic, null);
                if (Objects.isNull(producer)) {
                    CustomerPulsarClient orDefault = PulsarTemplate.this.multiPulsarClient.getOrDefault(str, null);
                    if (Objects.isNull(orDefault)) {
                        PulsarTemplate.log.error("[Pulsar] 数据源对应PulsarClient不存在，sourceName is {}", str);
                        throw new PulsarBusinessException("[Pulsar] 数据源对应PulsarClient不存在！");
                    }
                    producer = orDefault.getClient().newProducer(JSONSchema.of(t.getClass())).blockIfQueueFull(this.blockIfQueueFull.booleanValue()).maxPendingMessages(orDefault.getMaxPendingMessages().intValue()).topic(generateTopic).create();
                    PulsarTemplate.this.producerCaches.put(generateTopic, producer);
                    PulsarTemplate.log.info("[Pulsar] Producer实例化成功，sourceName is {}, topic is {}", str, generateTopic);
                }
                return producer.sendAsync(t);
            } catch (Exception e) {
                PulsarTemplate.log.error("[Pulsar] Producer实例化失败，topic is {}", generateTopic);
                throw e;
            }
        }

        private String generateTopic() {
            if (StringUtils.isBlank(this.topic)) {
                PulsarTemplate.log.error("[Pulsar] Topic 为空，无法发送消息, topic is {}", this.topic);
                throw new PulsarBusinessException("Topic不能为空");
            }
            String tenantBySourceName = StringUtils.isNotBlank(this.tenant) ? this.tenant : PulsarTemplate.this.multiPulsarProperties.getTenantBySourceName(this.sourceName);
            String namespaceBySourceName = StringUtils.isNotBlank(this.namespace) ? this.namespace : PulsarTemplate.this.multiPulsarProperties.getNamespaceBySourceName(this.sourceName);
            if (!StringUtils.isBlank(tenantBySourceName) && !StringUtils.isBlank(namespaceBySourceName)) {
                return TopicUtil.generateTopic(Objects.nonNull(this.persistent) ? this.persistent : Boolean.TRUE, tenantBySourceName, namespaceBySourceName, this.topic);
            }
            PulsarTemplate.log.error("[Pulsar] 租户||命名空间为空，无法创建发送消息, tenant is {}, namespace is {}", tenantBySourceName, namespaceBySourceName);
            throw new PulsarBusinessException("租户||命名空间为空，无法发送消息");
        }
    }

    public PulsarTemplate(MultiPulsarClient multiPulsarClient, MultiPulsarProperties multiPulsarProperties) {
        this.multiPulsarClient = multiPulsarClient;
        this.multiPulsarProperties = multiPulsarProperties;
    }

    public PulsarTemplate<T>.Builder createBuilder() {
        return new Builder();
    }
}
