package jmind.core.kafka;

import java.util.Properties;
import jmind.base.lang.SourceProperties;
import jmind.base.util.CollectionsUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:jmind/core/kafka/AbstractKafkaConsumer.class */
public abstract class AbstractKafkaConsumer {
    private String topicName;
    private String groupName;
    protected KafkaConsumer<String, String> consumer;
    private boolean started = true;
    private final Logger logger = LoggerFactory.getLogger(getClass());

    public AbstractKafkaConsumer(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", SourceProperties.getDataSource().getProperty("bootstrap.servers"));
        properties.put("group.id", str2);
        properties.put("enable.auto.commit", false);
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(properties);
        this.topicName = str;
        this.groupName = str2;
        listen();
    }

    private void listen() {
        this.consumer.subscribe(CollectionsUtil.asList(this.topicName, ","));
        Thread thread = new Thread(new Runnable() { // from class: jmind.core.kafka.AbstractKafkaConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                while (AbstractKafkaConsumer.this.isStarted()) {
                    try {
                        AbstractKafkaConsumer.this.handleMessage(AbstractKafkaConsumer.this.consumer.poll(10000L));
                        AbstractKafkaConsumer.this.consumer.commitAsync();
                    } catch (Exception e) {
                        AbstractKafkaConsumer.this.logger.error("consumer error, topic={},groupName={}", new Object[]{AbstractKafkaConsumer.this.topicName, AbstractKafkaConsumer.this.groupName, e});
                    }
                }
            }
        });
        thread.setDaemon(true);
        thread.setName(this.groupName + "-" + this.groupName + thread.getName());
        thread.start();
        this.logger.info("topic=" + this.topicName + ", groupName=" + this.groupName + " started");
    }

    public abstract void handleMessage(ConsumerRecords<String, String> consumerRecords) throws Exception;

    public String getTopicName() {
        return this.topicName;
    }

    public void setTopicName(String str) {
        this.topicName = str;
    }

    public String getGroupName() {
        return this.groupName;
    }

    public void setGroupName(String str) {
        this.groupName = str;
    }

    public boolean isStarted() {
        return this.started;
    }

    public void setStarted(boolean z) {
        this.started = z;
    }
}
