/*
 * Decompiled with CFR 0.152.
 */
package arp.message.kafka;

import arp.process.publish.Message;
import arp.process.publish.ProcessListenerMessageReceiver;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectInput;

public class KafkaMessageReceiver
implements ProcessListenerMessageReceiver {
    FSTConfiguration fstConf = FSTConfiguration.createDefaultConfiguration();
    private KafkaConsumer<String, byte[]> consumer;

    public KafkaMessageReceiver(String servers, String consumerGroup) {
        this.fstConf.setForceSerializable(true);
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("group.id", consumerGroup);
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        this.consumer = new KafkaConsumer(props);
    }

    public List<Message> receive() throws Exception {
        ArrayList<Message> messageList = new ArrayList<Message>();
        ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
        for (ConsumerRecord record : records) {
            byte[] msg = (byte[])record.value();
            FSTObjectInput ois = this.fstConf.getObjectInput((InputStream)new ByteArrayInputStream(msg));
            messageList.add((Message)ois.readObject());
        }
        return messageList;
    }

    public void subscribeProcesses(List<String> processesToSubscribe) {
        if (processesToSubscribe != null) {
            this.consumer.subscribe(processesToSubscribe);
        }
    }
}

