package pl.allegro.tech.hermes.frontend.publishing.handlers;

import com.codahale.metrics.Timer;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.xnio.XnioWorker;
import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.api.ErrorDescription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageEndProcessor;
import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageErrorProcessor;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.frontend.publishing.message.MessageState;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/publishing/handlers/PublishingHandler.class */
class PublishingHandler implements HttpHandler {
    private final BrokerMessageProducer brokerMessageProducer;
    private final MessageErrorProcessor messageErrorProcessor;
    private final MessageEndProcessor messageEndProcessor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublishingHandler(BrokerMessageProducer brokerMessageProducer, MessageErrorProcessor messageErrorProcessor, MessageEndProcessor messageEndProcessor) {
        this.brokerMessageProducer = brokerMessageProducer;
        this.messageErrorProcessor = messageErrorProcessor;
        this.messageEndProcessor = messageEndProcessor;
    }

    public void handleRequest(HttpServerExchange httpServerExchange) {
        httpServerExchange.dispatch(() -> {
            try {
                handle(httpServerExchange);
            } catch (RuntimeException e) {
                this.messageErrorProcessor.sendAndLog(httpServerExchange, "Exception while publishing message to a broker.", e);
            }
        });
    }

    private void handle(final HttpServerExchange httpServerExchange) {
        final AttachmentContent attachmentContent = (AttachmentContent) httpServerExchange.getAttachment(AttachmentContent.KEY);
        final MessageState messageState = attachmentContent.getMessageState();
        messageState.setSendingToKafkaProducerQueue();
        final Timer.Context startBrokerLatencyTimer = attachmentContent.getCachedTopic().startBrokerLatencyTimer();
        this.brokerMessageProducer.send(attachmentContent.getMessage(), attachmentContent.getCachedTopic(), new PublishingCallback() { // from class: pl.allegro.tech.hermes.frontend.publishing.handlers.PublishingHandler.1
            @Override // pl.allegro.tech.hermes.frontend.publishing.PublishingCallback
            public void onPublished(Message message, Topic topic) {
                XnioWorker worker = httpServerExchange.getConnection().getWorker();
                Timer.Context context = startBrokerLatencyTimer;
                MessageState messageState2 = messageState;
                AttachmentContent attachmentContent2 = attachmentContent;
                HttpServerExchange httpServerExchange2 = httpServerExchange;
                worker.execute(() -> {
                    context.close();
                    if (messageState2.setSentToKafka()) {
                        attachmentContent2.removeTimeout();
                        PublishingHandler.this.messageEndProcessor.sent(httpServerExchange2, attachmentContent2);
                    } else if (messageState2.setDelayedSentToKafka()) {
                        PublishingHandler.this.messageEndProcessor.delayedSent(httpServerExchange2, attachmentContent2.getCachedTopic(), message);
                    }
                });
            }

            @Override // pl.allegro.tech.hermes.frontend.publishing.PublishingCallback
            public void onUnpublished(Message message, Topic topic, Exception exc) {
                messageState.setErrorInSendingToKafka();
                startBrokerLatencyTimer.close();
                attachmentContent.removeTimeout();
                PublishingHandler.this.handleNotPublishedMessage(httpServerExchange, topic, attachmentContent.getMessageId(), exc);
            }
        });
        if (messageState.setSendingToKafka() && messageState.setDelayedProcessing()) {
            this.messageEndProcessor.bufferedButDelayedProcessing(httpServerExchange, attachmentContent);
        }
    }

    private void handleNotPublishedMessage(HttpServerExchange httpServerExchange, Topic topic, String str, Exception exc) {
        this.messageErrorProcessor.sendAndLog(httpServerExchange, topic, str, ErrorDescription.error("Message not published. " + ExceptionUtils.getRootCauseMessage(exc), ErrorCode.INTERNAL_ERROR));
    }
}
