package net.acesinc.data.json.generator.log;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import net.acesinc.data.json.util.JsonUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:net/acesinc/data/json/generator/log/KafkaLogger.class */
public class KafkaLogger implements EventLogger {
    private static final Logger log = LogManager.getLogger(KafkaLogger.class);
    public static final String BROKER_SERVER_PROP_NAME = "broker.server";
    public static final String BROKER_PORT_PROP_NAME = "broker.port";
    private final KafkaProducer<String, String> producer;
    private final String topic;
    private final boolean sync;
    private final boolean flatten;
    private final Properties props = new Properties();
    private JsonUtils jsonUtils;

    public KafkaLogger(Map<String, Object> map) {
        this.props.put("bootstrap.servers", ((String) map.get("broker.server")) + ":" + ((Integer) map.get("broker.port")).toString());
        this.props.put("key.serializer", StringSerializer.class.getName());
        this.props.put("value.serializer", StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(this.props);
        this.topic = (String) map.get("topic");
        if (map.get(TranquilityLogger.SYNC_PROP_NAME) != null) {
            this.sync = ((Boolean) map.get(TranquilityLogger.SYNC_PROP_NAME)).booleanValue();
        } else {
            this.sync = false;
        }
        if (map.get(TranquilityLogger.FLATTEN_PROP_NAME) != null) {
            this.flatten = ((Boolean) map.get(TranquilityLogger.FLATTEN_PROP_NAME)).booleanValue();
        } else {
            this.flatten = false;
        }
        this.jsonUtils = new JsonUtils();
    }

    @Override // net.acesinc.data.json.generator.log.EventLogger
    public void logEvent(String str, Map<String, Object> map) {
        logEvent(str);
    }

    private void logEvent(String str) {
        String str2 = str;
        if (this.flatten) {
            try {
                str2 = this.jsonUtils.flattenJson(str);
            } catch (IOException e) {
                log.error("Error flattening json. Unable to send event [ " + str + " ]", e);
                return;
            }
        }
        ProducerRecord producerRecord = new ProducerRecord(this.topic, str2);
        if (0 == 0) {
            log.debug("Sending event to Kafka: [ " + str2 + " ]");
            this.producer.send(producerRecord);
        } else {
            try {
                this.producer.send(producerRecord).get();
            } catch (InterruptedException | ExecutionException e2) {
                log.warn("Thread interrupted while waiting for synchronous response from producer", e2);
            }
        }
    }

    @Override // net.acesinc.data.json.generator.log.EventLogger
    public void shutdown() {
        this.producer.close();
    }
}
