package pl.allegro.tech.hermes.frontend.publishing;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Clock;
import java.util.Enumeration;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import javax.inject.Inject;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringUtils;
import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.api.ErrorDescription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.domain.topic.schema.CouldNotLoadSchemaException;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.publishing.callbacks.AsyncContextExecutionCallback;
import pl.allegro.tech.hermes.frontend.publishing.callbacks.BrokerListenersPublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.callbacks.HttpPublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.callbacks.MessageStatePublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.callbacks.MetricsPublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.frontend.publishing.message.MessageState;
import pl.allegro.tech.hermes.frontend.publishing.metadata.HeadersPropagator;
import pl.allegro.tech.hermes.frontend.publishing.metadata.MetadataAddingMessageConverter;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/publishing/PublishingServlet.class */
public class PublishingServlet extends HttpServlet {
    private final HermesMetrics hermesMetrics;
    private final ErrorSender errorSender;
    private final Trackers trackers;
    private final TopicsCache topicsCache;
    private final MessageValidators messageValidators;
    private final Clock clock;
    private final MessagePublisher messagePublisher;
    private final MessageContentTypeEnforcer contentTypeEnforcer;
    private final MetadataAddingMessageConverter metadataAddingMessageConverter;
    private final HeadersPropagator headersPropagator;
    private final BrokerListeners listeners;
    private final Integer defaultAsyncTimeout;
    private final Integer longAsyncTimeout;
    private final Integer chunkSize;

    @Inject
    public PublishingServlet(TopicsCache topicsCache, HermesMetrics hermesMetrics, ObjectMapper objectMapper, ConfigFactory configFactory, Trackers trackers, MessageValidators messageValidators, Clock clock, MessagePublisher messagePublisher, BrokerListeners brokerListeners, MessageContentTypeEnforcer messageContentTypeEnforcer, MetadataAddingMessageConverter metadataAddingMessageConverter, HeadersPropagator headersPropagator) {
        this.topicsCache = topicsCache;
        this.messageValidators = messageValidators;
        this.clock = clock;
        this.messagePublisher = messagePublisher;
        this.contentTypeEnforcer = messageContentTypeEnforcer;
        this.metadataAddingMessageConverter = metadataAddingMessageConverter;
        this.headersPropagator = headersPropagator;
        this.errorSender = new ErrorSender(objectMapper);
        this.hermesMetrics = hermesMetrics;
        this.trackers = trackers;
        this.listeners = brokerListeners;
        this.defaultAsyncTimeout = Integer.valueOf(configFactory.getIntProperty(Configs.FRONTEND_IDLE_TIMEOUT));
        this.longAsyncTimeout = Integer.valueOf(configFactory.getIntProperty(Configs.FRONTEND_LONG_IDLE_TIMEOUT));
        this.chunkSize = Integer.valueOf(configFactory.getIntProperty(Configs.FRONTEND_REQUEST_CHUNK_SIZE));
    }

    protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException {
        TopicName parseTopicName = parseTopicName(httpServletRequest);
        String uuid = UUID.randomUUID().toString();
        Optional<Topic> topic = this.topicsCache.getTopic(parseTopicName);
        if (topic.isPresent()) {
            handlePublishAsynchronously(httpServletRequest, httpServletResponse, topic.get(), uuid);
        } else {
            this.errorSender.sendErrorResponse(new ErrorDescription(String.format("Topic %s not exists in group %s", parseTopicName.getName(), parseTopicName.getGroupName()), ErrorCode.TOPIC_NOT_EXISTS), httpServletResponse, uuid);
        }
    }

    private void handlePublishAsynchronously(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Topic topic, String str) throws IOException {
        MessageState messageState = new MessageState();
        AsyncContext startAsync = httpServletRequest.startAsync();
        HttpResponder httpResponder = new HttpResponder(this.trackers, str, httpServletResponse, startAsync, topic, this.errorSender, messageState, httpServletRequest.getRemoteHost());
        startAsync.addListener(new TimeoutAsyncListener(httpResponder, messageState));
        startAsync.addListener(new MetricsAsyncListener(this.hermesMetrics, topic.getName(), topic.getAck()));
        startAsync.setTimeout((topic.isReplicationConfirmRequired() ? this.longAsyncTimeout : this.defaultAsyncTimeout).intValue());
        new MessageReader(httpServletRequest, this.chunkSize, topic.getName(), this.hermesMetrics, messageState, bArr -> {
            startAsync.start(() -> {
                try {
                    Message enforce = this.contentTypeEnforcer.enforce(httpServletRequest.getContentType(), new Message(str, bArr, this.clock.millis()), topic);
                    this.messageValidators.check(topic, enforce.getData());
                    Message addMetadata = this.metadataAddingMessageConverter.addMetadata(enforce, topic, this.headersPropagator.extract(toHeadersMap(httpServletRequest)));
                    startAsync.addListener(new BrokerTimeoutAsyncListener(httpResponder, addMetadata, topic, messageState, this.listeners));
                    this.messagePublisher.publish(addMetadata, topic, messageState, this.listeners, new AsyncContextExecutionCallback(startAsync, new MessageStatePublishingCallback(messageState), new HttpPublishingCallback(httpResponder), new MetricsPublishingCallback(this.hermesMetrics, topic), new BrokerListenersPublishingCallback(this.listeners)));
                } catch (CouldNotLoadSchemaException e) {
                    httpResponder.internalError(e, "Could not load schema for published message");
                } catch (RuntimeException e2) {
                    httpResponder.badRequest(e2);
                }
            });
        }, illegalStateException -> {
            httpResponder.badRequest(illegalStateException, "Validation error");
        }, th -> {
            httpResponder.internalError(th, "Error while reading request");
        });
    }

    private Map<String, String> toHeadersMap(HttpServletRequest httpServletRequest) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Enumeration headerNames = httpServletRequest.getHeaderNames();
        while (headerNames.hasMoreElements()) {
            String str = (String) headerNames.nextElement();
            builder.put(str, httpServletRequest.getHeader(str));
        }
        return builder.build();
    }

    private TopicName parseTopicName(HttpServletRequest httpServletRequest) {
        return TopicName.fromQualifiedName(StringUtils.substringAfterLast(StringUtils.strip(httpServletRequest.getRequestURI(), "/"), "/"));
    }
}
