package pl.allegro.tech.hermes.frontend.publishing.handlers.end;

import io.undertow.server.HttpServerExchange;
import io.undertow.util.HttpString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.http.MessageMetadataHeaders;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import pl.allegro.tech.hermes.frontend.publishing.handlers.AttachmentContent;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/publishing/handlers/end/MessageEndProcessor.class */
public class MessageEndProcessor {
    private static final Logger logger = LoggerFactory.getLogger(MessageEndProcessor.class);
    private static final HttpString messageIdHeader = new HttpString(MessageMetadataHeaders.MESSAGE_ID.getName());
    private final Trackers trackers;
    private final BrokerListeners brokerListeners;
    private final TrackingHeadersExtractor trackingHeadersExtractor;

    public MessageEndProcessor(Trackers trackers, BrokerListeners brokerListeners, TrackingHeadersExtractor trackingHeadersExtractor) {
        this.trackers = trackers;
        this.brokerListeners = brokerListeners;
        this.trackingHeadersExtractor = trackingHeadersExtractor;
    }

    public void sent(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent) {
        this.trackers.get(attachmentContent.getTopic()).logPublished(attachmentContent.getMessageId(), attachmentContent.getTopic().getName(), RemoteHostReader.readHostAndPort(httpServerExchange), this.trackingHeadersExtractor.extractHeadersToLog(httpServerExchange.getRequestHeaders()));
        sendResponse(httpServerExchange, attachmentContent, 201);
        attachmentContent.getCachedTopic().incrementPublished();
    }

    public void delayedSent(HttpServerExchange httpServerExchange, CachedTopic cachedTopic, Message message) {
        this.trackers.get(cachedTopic.getTopic()).logPublished(message.getId(), cachedTopic.getTopic().getName(), RemoteHostReader.readHostAndPort(httpServerExchange), this.trackingHeadersExtractor.extractHeadersToLog(httpServerExchange.getRequestHeaders()));
        this.brokerListeners.onAcknowledge(message, cachedTopic.getTopic());
        cachedTopic.incrementPublished();
    }

    public void bufferedButDelayedProcessing(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent) {
        bufferedButDelayed(httpServerExchange, attachmentContent);
        attachmentContent.getCachedTopic().markDelayedProcessing();
    }

    public void bufferedButDelayed(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent) {
        Topic topic = attachmentContent.getTopic();
        this.brokerListeners.onTimeout(attachmentContent.getMessage(), topic);
        this.trackers.get(topic).logInflight(attachmentContent.getMessageId(), topic.getName(), RemoteHostReader.readHostAndPort(httpServerExchange), this.trackingHeadersExtractor.extractHeadersToLog(httpServerExchange.getRequestHeaders()));
        handleRaceConditionBetweenAckAndTimeout(attachmentContent, topic);
        sendResponse(httpServerExchange, attachmentContent, 202);
    }

    private void handleRaceConditionBetweenAckAndTimeout(AttachmentContent attachmentContent, Topic topic) {
        if (attachmentContent.getMessageState().isDelayedSentToKafka()) {
            this.brokerListeners.onAcknowledge(attachmentContent.getMessage(), topic);
        }
    }

    private void sendResponse(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent, int i) {
        if (httpServerExchange.isResponseStarted()) {
            logger.warn("The response has already been started. Status code set on exchange: {}; Expected status code: {};Topic: {}; Message id: {}; Remote host {}", new Object[]{Integer.valueOf(httpServerExchange.getStatusCode()), Integer.valueOf(i), attachmentContent.getCachedTopic().getQualifiedName(), attachmentContent.getMessageId(), RemoteHostReader.readHostAndPort(httpServerExchange)});
        } else {
            httpServerExchange.setStatusCode(i);
            httpServerExchange.getResponseHeaders().add(messageIdHeader, attachmentContent.getMessageId());
        }
        attachmentContent.markResponseAsReady();
        try {
            httpServerExchange.endExchange();
        } catch (RuntimeException e) {
            logger.error("Exception while ending exchange. Status code set on exchange: {}; Expected status code: {};Topic: {}; Message id: {}; Remote host {}", new Object[]{Integer.valueOf(httpServerExchange.getStatusCode()), Integer.valueOf(i), attachmentContent.getCachedTopic().getQualifiedName(), attachmentContent.getMessageId(), RemoteHostReader.readHostAndPort(httpServerExchange), e});
        }
    }
}
