package com.gargle.common.kafka.consumer;

import com.gargle.common.config.GargleConfig;
import com.gargle.common.exception.GargleException;
import com.gargle.common.utils.string.StringUtil;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/gargle/common/kafka/consumer/BaseConsumer.class */
public abstract class BaseConsumer<K, V, T> implements Serializable {
    private static final Logger logger = LoggerFactory.getLogger(BaseConsumer.class);
    protected static String log_prefix = "[kafka-consumer-client]-";
    private static final AtomicLong count = new AtomicLong(0);
    private volatile boolean start = false;
    private GargleConfig.KafkaConsumerConfig consumerConfig;
    protected GargleConfig gargleConfig;
    protected ThreadPoolExecutor consumerExecutor;
    protected ThreadPoolExecutor processExecutor;
    protected BaseTask<K, V, T> task;

    /* loaded from: input_file:com/gargle/common/kafka/consumer/BaseConsumer$BaseTask.class */
    public static class BaseTask<K, V, T> implements Runnable, Serializable {
        private final GargleConfig.KafkaConsumerConfig consumerConfig;
        private final BaseConsumer<K, V, T> process;
        private volatile boolean run = true;
        private volatile boolean stop = false;

        public BaseTask(GargleConfig.KafkaConsumerConfig kafkaConsumerConfig, BaseConsumer<K, V, T> baseConsumer) {
            this.consumerConfig = kafkaConsumerConfig;
            this.process = baseConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig.toConsumerProperties());
                try {
                    kafkaConsumer.subscribe(Arrays.asList(this.consumerConfig.getTopics()));
                    BaseConsumer.logger.info("{}consumerName: {} kafka消费者启动线程创建完毕,已启动", BaseConsumer.log_prefix, this.consumerConfig.getConsumerName());
                    long intValue = (this.consumerConfig.getConsumerMaxPollIntervalMs().intValue() / 3) * 2;
                    long intValue2 = this.consumerConfig.getConsumerMaxPollIntervalMs().intValue() / 3;
                    while (this.run) {
                        long currentTimeMillis = System.currentTimeMillis();
                        Boolean[] boolArr = {true};
                        try {
                            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(1000L));
                            if (poll != null && poll.count() > 0) {
                                CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                    copyOnWriteArrayList.add(() -> {
                                        String str = BaseConsumer.log_prefix + this.process.getCount() + "-";
                                        try {
                                            return this.process.processRecord(consumerRecord, str);
                                        } catch (Exception e) {
                                            boolArr[0] = false;
                                            this.process.onProcessRecordFail(consumerRecord, e, str);
                                            return null;
                                        }
                                    });
                                }
                                if (copyOnWriteArrayList.size() != 0) {
                                    Iterator<Future<T>> it2 = this.process.processExecutor.invokeAll(copyOnWriteArrayList, intValue2, TimeUnit.MILLISECONDS).iterator();
                                    while (it2.hasNext()) {
                                        try {
                                            this.process.processT(it2.next().get(intValue, TimeUnit.MILLISECONDS));
                                        } catch (TimeoutException e) {
                                            BaseConsumer.logger.warn("{}consumerName: {} 任务执行超时.", BaseConsumer.log_prefix, this.consumerConfig.getConsumerName());
                                        }
                                    }
                                    BaseConsumer.logger.info("{}consumerName: {} 本批次拉取: {} 条.", new Object[]{BaseConsumer.log_prefix, this.consumerConfig.getConsumerName(), Integer.valueOf(poll.count())});
                                }
                                BaseConsumer.logger.info("{}consumerName: {} 本批次处理: {} 条记录, 耗时 {} ms", new Object[]{BaseConsumer.log_prefix, this.consumerConfig.getConsumerName(), Integer.valueOf(poll.count()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                            }
                        } catch (Exception e2) {
                            boolArr[0] = false;
                            BaseConsumer.logger.error("{}consumerName: {} kafka 消息执行异常", new Object[]{BaseConsumer.log_prefix, this.consumerConfig.getConsumerName(), e2});
                        }
                        if (!this.consumerConfig.getEnableAutoCommit().booleanValue()) {
                            if (this.consumerConfig.getConsumerMaxPollRecords().intValue() != 1) {
                                kafkaConsumer.commitSync();
                            } else if (boolArr[0].booleanValue()) {
                                kafkaConsumer.commitSync();
                            }
                        }
                    }
                    try {
                        kafkaConsumer.close();
                    } catch (Exception e3) {
                        BaseConsumer.logger.error("{}consumerName: {} kafka关闭异常(可忽略), e: {}", new Object[]{BaseConsumer.log_prefix, this.consumerConfig.getConsumerName(), e3});
                    }
                    this.stop = true;
                    BaseConsumer.logger.info("{}consumerName: {} kafka消费者线程已执行完成, consumer链接已关闭.", BaseConsumer.log_prefix, this.consumerConfig.getConsumerName());
                } catch (Exception e4) {
                    BaseConsumer.logger.error("{}consumerName: {} kafka消费者订阅topic异常.", new Object[]{BaseConsumer.log_prefix, this.consumerConfig.getConsumerName(), e4});
                    throw new GargleException(e4);
                }
            } catch (Exception e5) {
                BaseConsumer.logger.error("{}consumerName: {} kafka消费者创建异常.", new Object[]{BaseConsumer.log_prefix, this.consumerConfig.getConsumerName(), e5});
                throw new GargleException(e5);
            }
        }

        public void close() {
            this.run = false;
            int i = 0;
            Phaser phaser = new Phaser(1);
            while (!this.stop) {
                try {
                    phaser.awaitAdvanceInterruptibly(phaser.getPhase(), 1L, TimeUnit.SECONDS);
                } catch (InterruptedException | TimeoutException e) {
                }
                i++;
                if (i >= 10) {
                    return;
                }
            }
        }
    }

    @PostConstruct
    public synchronized void init() {
        if (this.start) {
            return;
        }
        this.start = true;
        this.gargleConfig = getGargleConfig();
        this.consumerConfig = getConsumerConfig();
        if (!this.consumerConfig.isEnable()) {
            logger.warn("{} 未开启.", log_prefix);
            return;
        }
        this.consumerExecutor = initConsumerPool();
        this.processExecutor = initProcessPool();
        start();
        log_prefix = "[kafka-consumer-client-" + getConsumerName() + "]-";
    }

    public void start() {
        logger.info("{}consumerName: {} kafka消费者开始启动,创建启动线程.", log_prefix, this.consumerConfig.getConsumerName());
        this.task = new BaseTask<>(this.consumerConfig, this);
        this.consumerExecutor.execute(this.task);
    }

    @PreDestroy
    public void close() {
        logger.info("{}consumerName: {} kafka消费者关闭中.", log_prefix, this.consumerConfig.getConsumerName());
        this.task.close();
        this.processExecutor.shutdown();
        this.consumerExecutor.shutdown();
        logger.info("{}consumerName: {} kafka消费者已关闭, 资源已释放.", log_prefix, this.consumerConfig.getConsumerName());
    }

    private ThreadPoolExecutor initConsumerPool() {
        return new ThreadPoolExecutor(1, 1, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue(1), runnable -> {
            return new Thread(runnable, this.consumerConfig.getConsumerName());
        });
    }

    private ThreadPoolExecutor initProcessPool() {
        int intValue = this.consumerConfig.getConsumerMaxPollRecords().intValue();
        return new ThreadPoolExecutor(intValue, intValue * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(intValue), runnable -> {
            return new Thread(runnable, this.consumerConfig.getConsumerName());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized long getCount() {
        if (count.get() > 90000000) {
            count.set(0L);
        }
        return count.getAndIncrement();
    }

    public GargleConfig.KafkaConsumerConfig getConsumerConfig() {
        String consumerName = getConsumerName();
        if (StringUtil.isBlank(consumerName)) {
            throw new GargleException("BaseConsumer-初始化存在consumerName为空的子类!");
        }
        List<GargleConfig.KafkaConsumerConfig> kafkaConsumers = this.gargleConfig.getKafkaConsumers();
        if (kafkaConsumers == null || kafkaConsumers.size() == 0) {
            throw new GargleException("BaseConsumer-初始化kafkaConsumers配置不存在 consumerName为: " + consumerName + " 的kafka消费者配置. kafkaConsumers is null");
        }
        for (GargleConfig.KafkaConsumerConfig kafkaConsumerConfig : kafkaConsumers) {
            if (kafkaConsumerConfig != null && consumerName.equals(kafkaConsumerConfig.getConsumerName())) {
                return kafkaConsumerConfig;
            }
        }
        throw new GargleException("BaseConsumer-初始化kafkaConsumers配置不存在 consumerName为: " + consumerName + " 的kafka消费者配置.");
    }

    public abstract String getConsumerName();

    public abstract GargleConfig getGargleConfig();

    public abstract T processRecord(ConsumerRecord<K, V> consumerRecord, String str);

    public abstract void onProcessRecordFail(ConsumerRecord<K, V> consumerRecord, Exception exc, String str);

    public abstract void processT(T t);
}
