package vip.sujianfeng.mq.kafka;

import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:vip/sujianfeng/mq/kafka/TbKafkaConsumer.class */
public class TbKafkaConsumer {
    private String groupId;
    private List<String> subscribedTopics = new ArrayList();
    private Properties properties = new Properties();

    public TbKafkaConsumer(TbKafkaConsumerConfig tbKafkaConsumerConfig, String str, String... strArr) {
        this.properties.put("bootstrap.servers", tbKafkaConsumerConfig.getBootstrapServers());
        this.properties.put("session.timeout.ms", Integer.valueOf(tbKafkaConsumerConfig.getSessionTimeoutMs()));
        this.properties.put("max.poll.records", Integer.valueOf(tbKafkaConsumerConfig.getMaxPollRecords()));
        this.properties.put("key.deserializer", tbKafkaConsumerConfig.getKeySerializer());
        this.properties.put("value.deserializer", tbKafkaConsumerConfig.getValueSerializer());
        this.properties.put("group.id", str);
        this.properties.put("enable.auto.commit", "false");
        this.groupId = str;
        for (String str2 : strArr) {
            this.subscribedTopics.add(str2);
        }
    }

    public <T> void consume(TbKafkaConsume<T> tbKafkaConsume, Class<T> cls) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.properties);
        kafkaConsumer.subscribe(this.subscribedTopics);
        while (true) {
            try {
                ConsumerRecords poll = kafkaConsumer.poll(1000L);
                ArrayList arrayList = new ArrayList();
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    try {
                        arrayList.add(new TbKafkaRecord<>((String) consumerRecord.key(), cls.isPrimitive() ? consumerRecord.value() : JSON.parseObject((String) consumerRecord.value(), cls)));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if (tbKafkaConsume.consume(arrayList)) {
                    kafkaConsumer.commitSync();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                tbKafkaConsume.onError(e2);
            }
        }
    }

    public Properties getProperties() {
        return this.properties;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public List<String> getSubscribedTopics() {
        return this.subscribedTopics;
    }
}
