package tech.mhuang.pacebox.kafka.admin;

import com.alibaba.fastjson2.JSON;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.mhuang.pacebox.core.check.CheckAssert;
import tech.mhuang.pacebox.core.pool.BaseExecutor;
import tech.mhuang.pacebox.core.pool.DefaultThreadPool;
import tech.mhuang.pacebox.core.util.CollectionUtil;
import tech.mhuang.pacebox.core.util.StringUtil;
import tech.mhuang.pacebox.kafka.admin.bean.KafkaInfo;
import tech.mhuang.pacebox.kafka.admin.external.IKafkaConsumer;
import tech.mhuang.pacebox.kafka.admin.external.IKafkaExternal;
import tech.mhuang.pacebox.kafka.admin.external.IKafkaProducer;
import tech.mhuang.pacebox.kafka.consumer.bean.ConsumerBean;
import tech.mhuang.pacebox.kafka.consumer.generator.KafkaConsumerGenerator;
import tech.mhuang.pacebox.kafka.global.exception.JkafkaException;
import tech.mhuang.pacebox.kafka.producer.bean.ProducerBean;

/* loaded from: input_file:tech/mhuang/pacebox/kafka/admin/KafkaFramework.class */
public class KafkaFramework {
    private static final Logger log = LoggerFactory.getLogger(KafkaFramework.class);
    private final KafkaInfo info;
    private final Map<String, IKafkaProducer> successProducerMap = new ConcurrentHashMap();
    private final Map<String, IKafkaConsumer> successConsumerMap = new ConcurrentHashMap();
    private BaseExecutor executorService;
    private IKafkaExternal kafkaExternal;

    public KafkaFramework(KafkaInfo kafkaInfo) {
        this.info = kafkaInfo;
    }

    public KafkaFramework executorService(BaseExecutor baseExecutor) {
        this.executorService = baseExecutor;
        return this;
    }

    public KafkaFramework kafkaExternal(IKafkaExternal iKafkaExternal) {
        this.kafkaExternal = iKafkaExternal;
        return this;
    }

    public void start() {
        log.info("loading kafka client....");
        if (this.kafkaExternal == null) {
            this.kafkaExternal = new IKafkaExternal() { // from class: tech.mhuang.pacebox.kafka.admin.KafkaFramework.1
            };
        }
        if (this.executorService == null) {
            this.executorService = new DefaultThreadPool();
        }
        if (this.info.isEnableProducer()) {
            if (CollectionUtil.isEmpty(this.info.getProducerMap())) {
                throw new JkafkaException("生产者启动但没有配置对应生产者,...");
            }
            this.info.getProducerMap().forEach((str, producerBean) -> {
                if (producerBean.isEnable()) {
                    generatorPro(str, producerBean);
                }
            });
        }
        if (this.info.isEnableConsumer()) {
            if (CollectionUtil.isEmpty(this.info.getConsumerMap())) {
                throw new JkafkaException("消费者启动但没有配置对应消费者,...");
            }
            this.info.getConsumerMap().forEach((str2, consumerBean) -> {
                if (consumerBean.isEnable()) {
                    generatorCon(str2, consumerBean);
                }
            });
        }
        log.info("start kafka client success..");
    }

    private void generatorPro(String str, ProducerBean producerBean) {
        if (StringUtil.isEmpty(producerBean.getServers())) {
            CheckAssert.check(this.info.getServers(), String.format("生产者%s:没有绑定对应的属性", str));
            producerBean.setServers(this.info.getServers());
        }
        IKafkaProducer createProducer = this.kafkaExternal.createProducer(str);
        try {
            createProducer.execute(JSON.toJSON(producerBean));
        } catch (InterruptedException | ExecutionException e) {
            log.error("kafka线程池执行异常", e);
        }
        this.successProducerMap.put(str, createProducer);
    }

    private void generatorCon(String str, ConsumerBean consumerBean) {
        if (StringUtil.isEmpty(consumerBean.getServers())) {
            CheckAssert.check(this.info.getServers(), String.format("消费者%s:没有绑定对应实例", str));
            consumerBean.setServers(this.info.getServers());
        }
        CheckAssert.check(consumerBean.getTopics(), String.format("消费者%s:没有指定对应主题", str));
        CheckAssert.check(consumerBean.getInvokeBeanName(), String.format("消费者%s:没有指定对应的调用bean", str));
        CheckAssert.check(consumerBean.getInvokeMethodName(), String.format("消费者%s:没有指定对应的方法名", str));
        List generatorParam = KafkaConsumerGenerator.generatorParam(consumerBean);
        if (CollectionUtil.isNotEmpty(generatorParam)) {
            generatorParam.forEach(jSONObject -> {
                String string = jSONObject.getJSONObject("consumerMap").getString("group.id");
                IKafkaConsumer createConsumer = this.kafkaExternal.createConsumer(string);
                try {
                    createConsumer.execute(jSONObject);
                } catch (InterruptedException | ExecutionException e) {
                    log.error("kafka消费线程初始化异常", e);
                }
                this.executorService.submit(createConsumer);
                this.successConsumerMap.put(string, createConsumer);
            });
        }
    }

    public KafkaInfo getInfo() {
        return this.info;
    }

    public Map<String, IKafkaProducer> getSuccessProducerMap() {
        return this.successProducerMap;
    }

    public Map<String, IKafkaConsumer> getSuccessConsumerMap() {
        return this.successConsumerMap;
    }

    public BaseExecutor getExecutorService() {
        return this.executorService;
    }

    public IKafkaExternal getKafkaExternal() {
        return this.kafkaExternal;
    }
}
