package co.cask.cdap.logging.appender.kafka;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.logging.appender.LogAppender;
import co.cask.cdap.logging.appender.LogMessage;
import co.cask.cdap.logging.serialize.LoggingEventSerializer;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/appender/kafka/KafkaLogAppender.class */
public final class KafkaLogAppender extends LogAppender {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogAppender.class);
    private static final String APPENDER_NAME = "KafkaLogAppender";
    private final SimpleKafkaProducer producer;
    private final LoggingEventSerializer loggingEventSerializer;
    private final CConfiguration cConf;
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    @Inject
    KafkaLogAppender(CConfiguration cConfiguration) {
        setName(APPENDER_NAME);
        addInfo("Initializing KafkaLogAppender...");
        this.cConf = cConfiguration;
        this.producer = new SimpleKafkaProducer(cConfiguration);
        this.loggingEventSerializer = new LoggingEventSerializer();
        addInfo("Successfully initialized KafkaLogAppender.");
    }

    @Override // co.cask.cdap.logging.appender.LogAppender
    protected void appendEvent(LogMessage logMessage) {
        try {
            String partitionKey = getPartitionKey(logMessage.getLoggingContext());
            if (!Strings.isNullOrEmpty(partitionKey)) {
                this.producer.publish(partitionKey, this.loggingEventSerializer.toBytes(logMessage));
            }
        } catch (Throwable th) {
            LOG.error("Got exception while serializing log event {}.", logMessage, th);
        }
    }

    private String getPartitionKey(LoggingContext loggingContext) {
        String value = ((LoggingContext.SystemTag) loggingContext.getSystemTagsMap().get(".namespaceId")).getValue();
        if (NamespaceId.SYSTEM.getNamespace().equals(value)) {
            return loggingContext.getLogPartition();
        }
        switch (LogPartitionType.valueOf(this.cConf.get("log.publish.partition.key").toUpperCase())) {
            case PROGRAM:
                return loggingContext.getLogPartition();
            case APPLICATION:
                return value + ":" + ((LoggingContext.SystemTag) loggingContext.getSystemTagsMap().get(".applicationId")).getValue();
            default:
                throw new IllegalArgumentException(String.format("Invalid log partition type %s. Allowed partition types are program/application", this.cConf.get("log.publish.partition.key")));
        }
    }

    public void stop() {
        if (this.stopped.compareAndSet(false, true)) {
            super.stop();
            this.producer.stop();
        }
    }
}
