package no.nav.common.kafka.producer.util;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import no.nav.common.kafka.producer.KafkaProducerClient;
import no.nav.common.kafka.producer.KafkaProducerClientImpl;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:no/nav/common/kafka/producer/util/KafkaProducerClientWithMetrics.class */
public class KafkaProducerClientWithMetrics<K, V> implements KafkaProducerClient<K, V> {
    public static final String KAFKA_PRODUCER_STATUS_COUNTER = "kafka_producer_status";
    public static final String KAFKA_PRODUCER_CURRENT_OFFSET_GAUGE = "kafka_producer_current_offset";
    private final KafkaProducerClient<K, V> client;
    private final MeterRegistry meterRegistry;
    private final Map<String, Counter> statusCounterMap = new HashMap();
    private final Map<String, Gauge> currentOffsetGaugeMap = new HashMap();
    private final Map<String, Long> currentOffsetMap = new HashMap();

    public KafkaProducerClientWithMetrics(Properties properties, MeterRegistry meterRegistry) {
        this.client = new KafkaProducerClientImpl(properties);
        this.meterRegistry = meterRegistry;
    }

    public KafkaProducerClientWithMetrics(KafkaProducerClient<K, V> kafkaProducerClient, MeterRegistry meterRegistry) {
        this.client = kafkaProducerClient;
        this.meterRegistry = meterRegistry;
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public void close() {
        this.client.close();
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public RecordMetadata sendSync(ProducerRecord<K, V> producerRecord) {
        try {
            RecordMetadata sendSync = this.client.sendSync(producerRecord);
            updateLatestOffset(sendSync);
            incrementRecordCount(producerRecord, false);
            return sendSync;
        } catch (Exception e) {
            incrementRecordCount(producerRecord, true);
            throw e;
        }
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return send(producerRecord, null);
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
        return this.client.send(producerRecord, (recordMetadata, exc) -> {
            incrementRecordCount(producerRecord, exc != null);
            if (recordMetadata != null) {
                updateLatestOffset(recordMetadata);
            }
            if (callback != null) {
                callback.onCompletion(recordMetadata, exc);
            }
        });
    }

    @Override // no.nav.common.kafka.producer.KafkaProducerClient
    public Producer<K, V> getProducer() {
        return this.client.getProducer();
    }

    private void updateLatestOffset(RecordMetadata recordMetadata) {
        String str = recordMetadata.topic() + "-" + recordMetadata.partition();
        this.currentOffsetMap.put(str, Long.valueOf(recordMetadata.hasOffset() ? recordMetadata.offset() : 0L));
        this.currentOffsetGaugeMap.computeIfAbsent(str, str2 -> {
            return Gauge.builder(KAFKA_PRODUCER_CURRENT_OFFSET_GAUGE, () -> {
                Long l = this.currentOffsetMap.get(str);
                return Long.valueOf(l != null ? l.longValue() : 0L);
            }).tag("topic", recordMetadata.topic()).tag("partition", String.valueOf(recordMetadata.partition())).register(this.meterRegistry);
        });
    }

    private void incrementRecordCount(ProducerRecord<K, V> producerRecord, boolean z) {
        this.statusCounterMap.computeIfAbsent(producerRecord.topic() + "-" + z, str -> {
            return Counter.builder(KAFKA_PRODUCER_STATUS_COUNTER).tag("topic", producerRecord.topic()).tag("status", z ? "failed" : "ok").register(this.meterRegistry);
        }).increment();
    }
}
