package pl.allegro.tech.hermes.frontend.producer.kafka;

import java.util.List;
import javax.inject.Singleton;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;

@Singleton
/* loaded from: input_file:pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.class */
public class KafkaBrokerMessageProducer implements BrokerMessageProducer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerMessageProducer.class);
    private final Producers producers;
    private final KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher;
    private final HermesMetrics metrics;
    private final MessageToKafkaProducerRecordConverter messageConverter;

    /* loaded from: input_file:pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer$SendCallback.class */
    private class SendCallback implements Callback {
        private final Message message;
        private final Topic topic;
        private final PublishingCallback callback;

        public SendCallback(Message message, Topic topic, PublishingCallback publishingCallback) {
            this.message = message;
            this.topic = topic;
            this.callback = publishingCallback;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                this.callback.onUnpublished(this.message, this.topic, exc);
            } else {
                this.callback.onPublished(this.message, this.topic);
                KafkaBrokerMessageProducer.this.producers.maybeRegisterNodeMetricsGauges(KafkaBrokerMessageProducer.this.metrics);
            }
        }
    }

    public KafkaBrokerMessageProducer(Producers producers, KafkaTopicMetadataFetcher kafkaTopicMetadataFetcher, HermesMetrics hermesMetrics, MessageToKafkaProducerRecordConverter messageToKafkaProducerRecordConverter) {
        this.producers = producers;
        this.kafkaTopicMetadataFetcher = kafkaTopicMetadataFetcher;
        this.metrics = hermesMetrics;
        this.messageConverter = messageToKafkaProducerRecordConverter;
        producers.registerGauges(hermesMetrics);
    }

    @Override // pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer
    public void send(Message message, CachedTopic cachedTopic, PublishingCallback publishingCallback) {
        try {
            this.producers.get(cachedTopic.getTopic()).send(this.messageConverter.convertToProducerRecord(message, cachedTopic.getKafkaTopics().getPrimary().name()), new SendCallback(message, cachedTopic.getTopic(), publishingCallback));
        } catch (Exception e) {
            publishingCallback.onUnpublished(message, cachedTopic.getTopic(), e);
        }
    }

    @Override // pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer
    public boolean isTopicAvailable(CachedTopic cachedTopic) {
        String asString = cachedTopic.getKafkaTopics().getPrimary().name().asString();
        try {
            List<PartitionInfo> partitionsFor = this.producers.get(cachedTopic.getTopic()).partitionsFor(asString);
            if (anyPartitionWithoutLeader(partitionsFor)) {
                logger.warn("Topic {} has partitions without a leader.", asString);
                return false;
            }
            if (anyUnderReplicatedPartition(partitionsFor, asString)) {
                logger.warn("Topic {} has under replicated partitions.", asString);
                return false;
            }
            if (partitionsFor.size() > 0) {
                return true;
            }
            logger.warn("No information about partitions for topic {}", asString);
            return false;
        } catch (Exception e) {
            logger.warn("Could not read information about partitions for topic {}. {}", asString, e.getMessage());
            return false;
        }
    }

    private boolean anyPartitionWithoutLeader(List<PartitionInfo> list) {
        return list.stream().anyMatch(partitionInfo -> {
            return partitionInfo.leader() == null;
        });
    }

    private boolean anyUnderReplicatedPartition(List<PartitionInfo> list, String str) throws Exception {
        int fetchMinInSyncReplicas = this.kafkaTopicMetadataFetcher.fetchMinInSyncReplicas(str);
        return list.stream().anyMatch(partitionInfo -> {
            return partitionInfo.inSyncReplicas().length < fetchMinInSyncReplicas;
        });
    }
}
