package no.nav.common.kafka.producer;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:no/nav/common/kafka/producer/KafkaProducerClientImpl.class */
public class KafkaProducerClientImpl<K, V> implements KafkaProducerClient<K, V> {
    private final Logger log;
    private final Producer<K, V> producer;

    public KafkaProducerClientImpl(Producer<K, V> producer) {
        this.log = LoggerFactory.getLogger(KafkaProducerClientImpl.class);
        this.producer = producer;
    }

    public KafkaProducerClientImpl(Properties properties) {
        this((Producer) new GracefulKafkaProducer(properties));
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public void close() {
        this.log.info("Closing kafka producer...");
        this.producer.close();
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public RecordMetadata sendSync(ProducerRecord<K, V> producerRecord) {
        Future<RecordMetadata> send = send(producerRecord, null);
        this.producer.flush();
        return send.get();
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return send(producerRecord, null);
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
        Callback wrapWithLog = wrapWithLog(callback);
        try {
            return this.producer.send(producerRecord, wrapWithLog);
        } catch (Exception e) {
            wrapWithLog.onCompletion((RecordMetadata) null, e);
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public Producer<K, V> getProducer() {
        return this.producer;
    }

    private Callback wrapWithLog(Callback callback) {
        return (recordMetadata, exc) -> {
            if (recordMetadata != null) {
                this.log.info("Record was sent topic={} partition={} offset={}", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
            } else if (exc != null) {
                this.log.error("Failed to send record", exc);
            }
            if (callback != null) {
                callback.onCompletion(recordMetadata, exc);
            }
        };
    }
}
