package pl.allegro.tech.hermes.frontend.publishing.handlers;

import io.undertow.io.Receiver;
import io.undertow.server.DefaultResponseListener;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import java.io.ByteArrayOutputStream;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.api.ErrorDescription;
import pl.allegro.tech.hermes.common.metric.timer.StartedTimersPair;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ContentLengthChecker;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageErrorProcessor;
import pl.allegro.tech.hermes.frontend.publishing.message.MessageState;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/publishing/handlers/MessageReadHandler.class */
class MessageReadHandler implements HttpHandler {
    private static final Logger logger = LoggerFactory.getLogger(MessageReadHandler.class);
    private final HttpHandler next;
    private final HttpHandler timeoutHandler;
    private final MessageErrorProcessor messageErrorProcessor;
    private final ContentLengthChecker contentLengthChecker;
    private final Duration defaultAsyncTimeout;
    private final Duration longAsyncTimeout;
    private final ThroughputLimiter throughputLimiter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:pl/allegro/tech/hermes/frontend/publishing/handlers/MessageReadHandler$DefaultResponseSimulator.class */
    public static final class DefaultResponseSimulator implements DefaultResponseListener {
        private static final boolean RESPONSE_SIMULATED = true;
        private final AtomicBoolean responseNotSimulatedOnlyOnce = new AtomicBoolean();

        DefaultResponseSimulator() {
        }

        public boolean handleDefaultResponse(HttpServerExchange httpServerExchange) {
            if (!((AttachmentContent) httpServerExchange.getAttachment(AttachmentContent.KEY)).isResponseReady()) {
                return true;
            }
            if (httpServerExchange.getStatusCode() == 200) {
                try {
                    httpServerExchange.setStatusCode(500);
                } catch (RuntimeException e) {
                    MessageReadHandler.logger.error("Exception has been thrown during an exchange status modification", e);
                }
            }
            return !this.responseNotSimulatedOnlyOnce.compareAndSet(false, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageReadHandler(HttpHandler httpHandler, HttpHandler httpHandler2, MessageErrorProcessor messageErrorProcessor, ThroughputLimiter throughputLimiter, boolean z, Duration duration, Duration duration2) {
        this.next = httpHandler;
        this.timeoutHandler = httpHandler2;
        this.messageErrorProcessor = messageErrorProcessor;
        this.contentLengthChecker = new ContentLengthChecker(z);
        this.defaultAsyncTimeout = duration;
        this.longAsyncTimeout = duration2;
        this.throughputLimiter = throughputLimiter;
    }

    public void handleRequest(HttpServerExchange httpServerExchange) {
        AttachmentContent attachmentContent = (AttachmentContent) httpServerExchange.getAttachment(AttachmentContent.KEY);
        Duration duration = attachmentContent.getTopic().isReplicationConfirmRequired() ? this.longAsyncTimeout : this.defaultAsyncTimeout;
        attachmentContent.setTimeoutHolder(new TimeoutHolder((int) duration.toMillis(), httpServerExchange.getIoThread().executeAfter(() -> {
            runTimeoutHandler(httpServerExchange, attachmentContent);
        }, duration.toMillis(), TimeUnit.MILLISECONDS)));
        ThroughputLimiter.QuotaInsight checkQuota = this.throughputLimiter.checkQuota(attachmentContent.getCachedTopic().getTopicName(), attachmentContent.getCachedTopic().getThroughput());
        if (checkQuota.hasQuota()) {
            readMessage(httpServerExchange, attachmentContent);
        } else {
            respondWithQuotaViolation(httpServerExchange, attachmentContent, checkQuota.getReason());
        }
    }

    private void runTimeoutHandler(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent) {
        try {
            this.timeoutHandler.handleRequest(httpServerExchange);
        } catch (Exception e) {
            this.messageErrorProcessor.sendAndLog(httpServerExchange, attachmentContent.getTopic(), attachmentContent.getMessageId(), ErrorDescription.error("Error while handling timeout task", ErrorCode.INTERNAL_ERROR), e);
        }
    }

    private void readMessage(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        MessageState messageState = attachmentContent.getMessageState();
        StartedTimersPair startRequestReadTimers = attachmentContent.getCachedTopic().startRequestReadTimers();
        Receiver requestReceiver = httpServerExchange.getRequestReceiver();
        attachmentContent.getTimeoutHolder().onTimeout(r4 -> {
            startRequestReadTimers.close();
            requestReceiver.pause();
        });
        if (messageState.setReading()) {
            requestReceiver.receivePartialBytes(partialMessageRead(messageState, byteArrayOutputStream, startRequestReadTimers, attachmentContent), readingError(messageState, startRequestReadTimers, attachmentContent));
        } else {
            startRequestReadTimers.close();
            this.messageErrorProcessor.sendAndLog(httpServerExchange, attachmentContent.getTopic(), attachmentContent.getMessageId(), ErrorDescription.error("Probably context switching problem as timeout elapsed before message reading was started", ErrorCode.INTERNAL_ERROR));
        }
    }

    private Receiver.PartialBytesCallback partialMessageRead(MessageState messageState, ByteArrayOutputStream byteArrayOutputStream, StartedTimersPair startedTimersPair, AttachmentContent attachmentContent) {
        return (httpServerExchange, bArr, z) -> {
            if (messageState.isReadingTimeout()) {
                endWithoutDefaultResponse(httpServerExchange);
                return;
            }
            byteArrayOutputStream.write(bArr, 0, bArr.length);
            if (z) {
                if (!messageState.setFullyRead()) {
                    endWithoutDefaultResponse(httpServerExchange);
                } else {
                    startedTimersPair.close();
                    messageRead(httpServerExchange, byteArrayOutputStream.toByteArray(), attachmentContent);
                }
            }
        };
    }

    private Receiver.ErrorCallback readingError(MessageState messageState, StartedTimersPair startedTimersPair, AttachmentContent attachmentContent) {
        return (httpServerExchange, iOException) -> {
            if (!messageState.setReadingError()) {
                this.messageErrorProcessor.log(httpServerExchange, "Error while reading message after timeout execution. " + ExceptionUtils.getRootCauseMessage(iOException), iOException);
                return;
            }
            startedTimersPair.close();
            attachmentContent.removeTimeout();
            this.messageErrorProcessor.sendAndLog(httpServerExchange, attachmentContent.getTopic(), attachmentContent.getMessageId(), ErrorDescription.error("Error while reading message. " + ExceptionUtils.getRootCauseMessage(iOException), ErrorCode.INTERNAL_ERROR), iOException);
        };
    }

    private void messageRead(HttpServerExchange httpServerExchange, byte[] bArr, AttachmentContent attachmentContent) {
        try {
            this.contentLengthChecker.check(httpServerExchange, bArr.length, attachmentContent);
            attachmentContent.getCachedTopic().reportMessageContentSize(bArr.length);
            ThroughputLimiter.QuotaInsight checkQuota = this.throughputLimiter.checkQuota(attachmentContent.getCachedTopic().getTopicName(), attachmentContent.getCachedTopic().getThroughput());
            if (checkQuota.hasQuota()) {
                finalizeMessageRead(httpServerExchange, bArr, attachmentContent);
            } else {
                respondWithQuotaViolation(httpServerExchange, attachmentContent, checkQuota.getReason());
            }
        } catch (ContentLengthChecker.ContentTooLargeException | ContentLengthChecker.InvalidContentLengthException e) {
            attachmentContent.removeTimeout();
            this.messageErrorProcessor.sendAndLog(httpServerExchange, attachmentContent.getTopic(), attachmentContent.getMessageId(), ErrorDescription.error(e.getMessage(), ErrorCode.VALIDATION_ERROR));
        } catch (Exception e2) {
            attachmentContent.removeTimeout();
            this.messageErrorProcessor.sendAndLog(httpServerExchange, attachmentContent.getTopic(), attachmentContent.getMessageId(), e2);
        }
    }

    private void finalizeMessageRead(HttpServerExchange httpServerExchange, byte[] bArr, AttachmentContent attachmentContent) throws Exception {
        attachmentContent.setMessageContent(bArr);
        endWithoutDefaultResponse(httpServerExchange);
        if (httpServerExchange.isInIoThread()) {
            dispatchToWorker(httpServerExchange, attachmentContent);
        } else {
            this.next.handleRequest(httpServerExchange);
        }
    }

    private void respondWithQuotaViolation(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent, String str) {
        attachmentContent.removeTimeout();
        this.messageErrorProcessor.sendAndLog(httpServerExchange, attachmentContent.getTopic(), attachmentContent.getMessageId(), ErrorDescription.error(str, ErrorCode.THROUGHPUT_QUOTA_VIOLATION));
    }

    private void dispatchToWorker(HttpServerExchange httpServerExchange, AttachmentContent attachmentContent) {
        httpServerExchange.getConnection().getWorker().execute(() -> {
            try {
                this.next.handleRequest(httpServerExchange);
            } catch (Exception e) {
                attachmentContent.removeTimeout();
                this.messageErrorProcessor.sendAndLog(httpServerExchange, attachmentContent.getTopic(), attachmentContent.getMessageId(), ErrorDescription.error("Error while executing next handler after read handler", ErrorCode.INTERNAL_ERROR), e);
            }
        });
    }

    private void endWithoutDefaultResponse(HttpServerExchange httpServerExchange) {
        httpServerExchange.addDefaultResponseListener(new DefaultResponseSimulator());
    }
}
