package cn.antcore.event.core.queue;

import cn.antcore.event.Constants;
import cn.antcore.event.core.queue.MessageQueue;
import cn.antcore.kafka.ConsumerCallback;
import cn.antcore.kafka.core.AntKafka;
import cn.antcore.kafka.core.AntKafkaBuilder;
import cn.antcore.kafka.entity.ReceiveResult;
import cn.antcore.resources.config.Config;
import cn.antcore.resources.config.GlobalConfig;
import cn.antcore.resources.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/antcore/event/core/queue/KafkaQueue.class */
public class KafkaQueue implements MessageQueue<byte[], byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaQueue.class);
    private static final Config CONFIG = GlobalConfig.get();
    private AntKafka<byte[], byte[]> client;

    public KafkaQueue(String str) {
        this.client = AntKafkaBuilder.create(str).bootstrapService(CONFIG.getValue(Constants.QUEUE_KAFKA_SERVICES)).build();
    }

    @Override // cn.antcore.event.core.queue.MessageQueue
    public void send(byte[] bArr, byte[] bArr2) {
        this.client.send(bArr, bArr2);
    }

    @Override // cn.antcore.event.core.queue.MessageQueue
    public void receive(MessageQueue.Callback<byte[], byte[]> callback) {
        receive(getGroupId(), callback);
    }

    @Override // cn.antcore.event.core.queue.MessageQueue
    public void receive(String str, final MessageQueue.Callback<byte[], byte[]> callback) {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        this.client.receive(str, new ConsumerCallback<byte[], byte[]>() { // from class: cn.antcore.event.core.queue.KafkaQueue.1
            public void onInit() {
                if (KafkaQueue.LOG.isInfoEnabled()) {
                    KafkaQueue.LOG.info("Ant event initialization successful.");
                }
            }

            public void onReceive(ReceiveResult<byte[], byte[]> receiveResult) {
            }

            public void onSuccess(byte[] bArr, byte[] bArr2) {
                callback.onSuccess(bArr, bArr2);
            }

            public void onFail(Exception exc) {
                if (KafkaQueue.LOG.isErrorEnabled()) {
                    KafkaQueue.LOG.error("Kafka receive fail.", exc);
                }
            }
        });
    }

    private String getGroupId() {
        String value = CONFIG.getValue(Constants.QUEUE_KAFKA_GROUP_ID);
        if (StringUtils.isEmpty(value)) {
            value = Constants.QUEUE_KAFKA_DEFAULT_GROUP_ID;
        }
        return value;
    }
}
