package pl.allegro.tech.hermes.frontend.producer.kafka;

import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;

@Singleton
/* loaded from: input_file:pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.class */
public class KafkaBrokerMessageProducer implements BrokerMessageProducer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerMessageProducer.class);
    private final Producers producers;
    private final HermesMetrics metrics;

    /* loaded from: input_file:pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer$SendCallback.class */
    private class SendCallback implements Callback {
        private final Message message;
        private final Topic topic;
        private final PublishingCallback callback;

        public SendCallback(Message message, Topic topic, PublishingCallback publishingCallback) {
            this.message = message;
            this.topic = topic;
            this.callback = publishingCallback;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                this.callback.onUnpublished(this.message, this.topic, exc);
            } else {
                this.callback.onPublished(this.message, this.topic);
                KafkaBrokerMessageProducer.this.producers.maybeRegisterNodeMetricsGauges(KafkaBrokerMessageProducer.this.metrics);
            }
        }
    }

    @Inject
    public KafkaBrokerMessageProducer(Producers producers, HermesMetrics hermesMetrics) {
        this.producers = producers;
        this.metrics = hermesMetrics;
        producers.registerGauges(hermesMetrics);
    }

    @Override // pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer
    public void send(Message message, CachedTopic cachedTopic, PublishingCallback publishingCallback) {
        try {
            this.producers.get(cachedTopic.getTopic()).send(new ProducerRecord(cachedTopic.getKafkaTopics().getPrimary().name().asString(), message.getData()), new SendCallback(message, cachedTopic.getTopic(), publishingCallback));
        } catch (Exception e) {
            publishingCallback.onUnpublished(message, cachedTopic.getTopic(), e);
        }
    }

    @Override // pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer
    public boolean isTopicAvailable(CachedTopic cachedTopic) {
        String asString = cachedTopic.getKafkaTopics().getPrimary().name().asString();
        try {
            if (this.producers.get(cachedTopic.getTopic()).partitionsFor(asString).size() > 0) {
                return true;
            }
            logger.warn("No information about partitions for topic {}", asString);
            return false;
        } catch (Exception e) {
            logger.warn("Could not read information about partitions for topic {}. {}", asString, e.getMessage());
            return false;
        }
    }
}
