/*
 * Decompiled with CFR 0.152.
 */
package arp.message.kafka;

import arp.process.publish.Message;
import arp.process.publish.MessageSender;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectOutput;

public class KafkaMessageSender
implements MessageSender {
    private KafkaProducer<String, byte[]> producer;
    private FSTConfiguration fstConf = FSTConfiguration.createDefaultConfiguration();
    private Properties props;

    public KafkaMessageSender(String servers) {
        this.fstConf.setForceSerializable(true);
        this.props = new Properties();
        this.props.put("bootstrap.servers", servers);
        this.props.put("key.serializer", StringSerializer.class);
        this.props.put("value.serializer", ByteArraySerializer.class);
        this.props.put("max.block.ms", (Object)60000);
        this.producer = new KafkaProducer(this.props);
    }

    public void send(Message msg) throws Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        FSTObjectOutput oos = this.fstConf.getObjectOutput((OutputStream)bos);
        oos.writeObject((Object)msg);
        oos.flush();
        ProducerRecord record = new ProducerRecord(msg.getProcessDesc(), (Object)bos.toByteArray());
        this.producer.send(record).get();
    }

    public void defineProcessesToSend(List<String> processesToSend) {
        AdminClient adminClient = KafkaAdminClient.create((Properties)this.props);
        Integer numPartitions = 1;
        Short replicationFactor = 1;
        ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
        for (String process : processesToSend) {
            topics.add(new NewTopic(process, Optional.of(numPartitions), Optional.of(replicationFactor)));
        }
        adminClient.createTopics(topics);
        adminClient.close();
    }
}

