package pl.allegro.tech.hermes.frontend.producer.kafka;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.ToDoubleFunction;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/producer/kafka/Producers.class */
public class Producers {
    private final Producer<byte[], byte[]> ackLeader;
    private final Producer<byte[], byte[]> ackAll;
    private final boolean reportNodeMetrics;
    private final AtomicBoolean nodeMetricsRegistered = new AtomicBoolean(false);

    public Producers(Producer<byte[], byte[]> producer, Producer<byte[], byte[]> producer2, boolean z) {
        this.ackLeader = producer;
        this.ackAll = producer2;
        this.reportNodeMetrics = z;
    }

    public Producer<byte[], byte[]> get(Topic topic) {
        return topic.isReplicationConfirmRequired() ? this.ackAll : this.ackLeader;
    }

    public void registerGauges(MetricsFacade metricsFacade) {
        MetricName producerMetric = producerMetric("buffer-total-bytes", "producer-metrics", "buffer total bytes");
        metricsFacade.producer().registerAckAllTotalBytesGauge(this.ackAll, producerGauge(producerMetric));
        metricsFacade.producer().registerAckLeaderTotalBytesGauge(this.ackLeader, producerGauge(producerMetric));
        MetricName producerMetric2 = producerMetric("buffer-available-bytes", "producer-metrics", "buffer available bytes");
        metricsFacade.producer().registerAckAllAvailableBytesGauge(this.ackAll, producerGauge(producerMetric2));
        metricsFacade.producer().registerAckLeaderAvailableBytesGauge(this.ackLeader, producerGauge(producerMetric2));
        MetricName producerMetric3 = producerMetric("compression-rate-avg", "producer-metrics", "average compression rate");
        metricsFacade.producer().registerAckAllCompressionRateGauge(this.ackAll, producerGauge(producerMetric3));
        metricsFacade.producer().registerAckLeaderCompressionRateGauge(this.ackLeader, producerGauge(producerMetric3));
        MetricName producerMetric4 = producerMetric("record-error-total", "producer-metrics", "failed publishing batches");
        metricsFacade.producer().registerAckAllFailedBatchesGauge(this.ackAll, producerGauge(producerMetric4));
        metricsFacade.producer().registerAckLeaderFailedBatchesGauge(this.ackLeader, producerGauge(producerMetric4));
        MetricName producerMetric5 = producerMetric("metadata-age", "producer-metrics", "age [s] of metadata");
        metricsFacade.producer().registerAckAllMetadataAgeGauge(this.ackAll, producerGauge(producerMetric5));
        metricsFacade.producer().registerAckLeaderMetadataAgeGauge(this.ackLeader, producerGauge(producerMetric5));
        MetricName producerMetric6 = producerMetric("record-queue-time-max", "producer-metrics", "maximum time [ms] that batch spent in the send buffer");
        metricsFacade.producer().registerAckAllRecordQueueTimeMaxGauge(this.ackAll, producerGauge(producerMetric6));
        metricsFacade.producer().registerAckLeaderRecordQueueTimeMaxGauge(this.ackLeader, producerGauge(producerMetric6));
    }

    public void maybeRegisterNodeMetricsGauges(MetricsFacade metricsFacade) {
        if (this.reportNodeMetrics && this.nodeMetricsRegistered.compareAndSet(false, true)) {
            registerLatencyPerBrokerGauge(metricsFacade);
        }
    }

    private void registerLatencyPerBrokerGauge(MetricsFacade metricsFacade) {
        for (Node node : ProducerBrokerNodeReader.read(this.ackLeader)) {
            metricsFacade.producer().registerAckAllMaxLatencyBrokerGauge(this.ackAll, producerLatencyGauge("request-latency-max", node), node.host());
            metricsFacade.producer().registerAckLeaderMaxLatencyPerBrokerGauge(this.ackLeader, producerLatencyGauge("request-latency-max", node), node.host());
            metricsFacade.producer().registerAckAllAvgLatencyPerBrokerGauge(this.ackAll, producerLatencyGauge("request-latency-avg", node), node.host());
            metricsFacade.producer().registerAckLeaderAvgLatencyPerBrokerGauge(this.ackLeader, producerLatencyGauge("request-latency-avg", node), node.host());
        }
    }

    private double findProducerMetric(Producer<byte[], byte[]> producer, Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
        double doubleValue = ((Double) producer.metrics().entrySet().stream().filter(predicate).findFirst().map(entry -> {
            return Double.valueOf(((Metric) entry.getValue()).value());
        }).orElse(Double.valueOf(0.0d))).doubleValue();
        if (doubleValue < 0.0d) {
            return 0.0d;
        }
        return doubleValue;
    }

    private ToDoubleFunction<Producer<byte[], byte[]>> producerLatencyGauge(String str, Node node) {
        Predicate predicate = entry -> {
            return ((MetricName) entry.getKey()).group().equals("producer-node-metrics") && ((MetricName) entry.getKey()).name().equals(str) && ((MetricName) entry.getKey()).tags().containsValue("node-" + node.id());
        };
        return producer -> {
            return findProducerMetric(producer, predicate);
        };
    }

    private ToDoubleFunction<Producer<byte[], byte[]>> producerGauge(MetricName metricName) {
        Predicate predicate = entry -> {
            return ((MetricName) entry.getKey()).group().equals(metricName.group()) && ((MetricName) entry.getKey()).name().equals(metricName.name());
        };
        return producer -> {
            return findProducerMetric(producer, predicate);
        };
    }

    private static MetricName producerMetric(String str, String str2, String str3) {
        return new MetricName(str, str2, str3, Collections.emptyMap());
    }

    public void close() {
        this.ackAll.close();
        this.ackLeader.close();
    }
}
