/*
 * Decompiled with CFR 0.152.
 */
package arp.message.kafka;

import arp.process.publish.Message;
import arp.process.publish.MonitorMessage;
import arp.process.publish.MonitorMessageConvertor;
import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectInput;

public class KafkaMonitorMessageConvertor
extends MonitorMessageConvertor {
    FSTConfiguration fstConf = FSTConfiguration.createDefaultConfiguration();
    private String monitorTopicPrefix;
    private KafkaConsumer<String, byte[]> consumer;
    private KafkaProducer<String, String> producer;
    private Properties producerProps;
    private Gson gson;

    public KafkaMonitorMessageConvertor(String servers, String consumerGroup, String monitorTopicPrefix) {
        this.fstConf.setForceSerializable(true);
        this.gson = new Gson();
        this.monitorTopicPrefix = monitorTopicPrefix;
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", servers);
        consumerProps.put("group.id", consumerGroup);
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class);
        this.consumer = new KafkaConsumer(consumerProps);
        this.producerProps = new Properties();
        this.producerProps.put("bootstrap.servers", servers);
        this.producerProps.put("key.serializer", StringSerializer.class);
        this.producerProps.put("value.serializer", StringSerializer.class);
        this.producerProps.put("max.block.ms", (Object)60000);
        this.producer = new KafkaProducer(this.producerProps);
    }

    protected void send(MonitorMessage monitorMessage) throws Exception {
        ProducerRecord record = new ProducerRecord(this.monitorTopicPrefix + monitorMessage.getProcessDesc(), (Object)this.gson.toJson((Object)monitorMessage));
        this.producer.send(record).get();
    }

    protected MonitorMessage convertMessage(Message msg) {
        MonitorMessage monitorMessage = new MonitorMessage();
        monitorMessage.setProcessDesc(msg.getProcessDesc());
        monitorMessage.setProcessInputs(this.gson.toJson((Object)msg.getProcessInput()));
        monitorMessage.setProcessOutput(this.gson.toJson(msg.getProcessOutput()));
        ArrayList processCreatedAggrs = new ArrayList();
        for (Object aggr : msg.getProcessCreatedAggrs()) {
            HashMap<String, Object> aggrDto = new HashMap<String, Object>();
            aggrDto.put("class", aggr.getClass().getName());
            aggrDto.put("aggr", aggr);
            processCreatedAggrs.add(aggrDto);
        }
        monitorMessage.setProcessCreatedAggrs(this.gson.toJson(processCreatedAggrs));
        ArrayList processDeletedAggrs = new ArrayList();
        for (Object aggr : msg.getProcessDeletedAggrs()) {
            HashMap<String, String> aggrDto = new HashMap<String, String>();
            aggrDto.put("class", aggr.getClass().getName());
            aggrDto.put("aggr", (String)aggr);
            processDeletedAggrs.add(aggrDto);
        }
        monitorMessage.setProcessDeletedAggrs(this.gson.toJson(processDeletedAggrs));
        ArrayList<Map[]> processUpdatedAggrs = new ArrayList<Map[]>();
        for (Object[] aggrs : msg.getProcessUpdatedAggrs()) {
            Map[] aggrDtos = new Map[2];
            HashMap<String, Object> aggrDto0 = new HashMap<String, Object>();
            aggrDto0.put("class", aggrs[0].getClass().getName());
            aggrDto0.put("aggr", aggrs[0]);
            aggrDtos[0] = aggrDto0;
            HashMap<String, Object> aggrDto1 = new HashMap<String, Object>();
            aggrDto1.put("class", aggrs[1].getClass().getName());
            aggrDto1.put("aggr", aggrs[1]);
            aggrDtos[1] = aggrDto1;
            processUpdatedAggrs.add(aggrDtos);
        }
        monitorMessage.setProcessUpdatedAggrs(this.gson.toJson(processUpdatedAggrs));
        monitorMessage.setProcessFinishTime(msg.getProcessFinishTime());
        return monitorMessage;
    }

    protected List<Message> receive() throws Exception {
        ArrayList<Message> messageList = new ArrayList<Message>();
        ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
        for (ConsumerRecord record : records) {
            byte[] msg = (byte[])record.value();
            FSTObjectInput ois = this.fstConf.getObjectInput((InputStream)new ByteArrayInputStream(msg));
            messageList.add((Message)ois.readObject());
        }
        return messageList;
    }

    protected void subscribeProcesses(List<String> processesToSubscribe) {
        if (processesToSubscribe != null) {
            this.consumer.subscribe(processesToSubscribe);
        }
    }

    protected void defineProcessesToPublish(List<String> processesToPublish) {
        AdminClient adminClient = KafkaAdminClient.create((Properties)this.producerProps);
        Integer numPartitions = 1;
        Short replicationFactor = 1;
        ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
        for (String process : processesToPublish) {
            topics.add(new NewTopic(this.monitorTopicPrefix + process, Optional.of(numPartitions), Optional.of(replicationFactor)));
        }
        adminClient.createTopics(topics);
        adminClient.close();
    }
}

