package pl.allegro.tech.hermes.frontend.server;

import io.undertow.server.ExchangeCompletionListener;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;

/* loaded from: input_file:pl/allegro/tech/hermes/frontend/server/HermesShutdownHandler.class */
public class HermesShutdownHandler implements HttpHandler {
    private static final Logger logger = LoggerFactory.getLogger(HermesShutdownHandler.class);
    private static final int MILLIS = 1000;
    private static final int MAX_INFLIGHT_RETRIES = 20;
    private static final int TOLERANCE_BYTES = 5;
    private final HttpHandler next;
    private final HermesMetrics metrics;
    private final ExchangeCompletionListener completionListener = new GracefulExchangeCompletionListener();
    private final AtomicInteger inflightRequests = new AtomicInteger();
    private volatile boolean shutdown = false;

    /* loaded from: input_file:pl/allegro/tech/hermes/frontend/server/HermesShutdownHandler$GracefulExchangeCompletionListener.class */
    private final class GracefulExchangeCompletionListener implements ExchangeCompletionListener {
        private GracefulExchangeCompletionListener() {
        }

        public void exchangeEvent(HttpServerExchange httpServerExchange, ExchangeCompletionListener.NextListener nextListener) {
            HermesShutdownHandler.this.inflightRequests.decrementAndGet();
            nextListener.proceed();
        }
    }

    public HermesShutdownHandler(HttpHandler httpHandler, HermesMetrics hermesMetrics) {
        this.next = httpHandler;
        this.metrics = hermesMetrics;
        hermesMetrics.registerProducerInflightRequest(() -> {
            return Integer.valueOf(this.inflightRequests.get());
        });
    }

    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        if (this.shutdown) {
            httpServerExchange.setStatusCode(503);
            httpServerExchange.endExchange();
        } else {
            httpServerExchange.addExchangeCompleteListener(this.completionListener);
            this.inflightRequests.incrementAndGet();
            this.next.handleRequest(httpServerExchange);
        }
    }

    public void handleShutdown() throws InterruptedException {
        this.shutdown = true;
        logger.info("Waiting for inflight requests to complete");
        awaitRequestsComplete();
        logger.info("Awaiting buffer flush");
        awaitBufferFlush();
        logger.info("Shutdown complete");
    }

    private void awaitRequestsComplete() throws InterruptedException {
        int i = MAX_INFLIGHT_RETRIES;
        while (this.inflightRequests.get() > 0 && i > 0) {
            logger.info("Inflight requests: {}, timing out in {} ms", Integer.valueOf(this.inflightRequests.get()), Integer.valueOf(i * MILLIS));
            i--;
            Thread.sleep(1000L);
        }
    }

    private void awaitBufferFlush() throws InterruptedException {
        while (!isBufferEmpty()) {
            Thread.sleep(1000L);
        }
    }

    private boolean isBufferEmpty() {
        long bufferTotalBytes = (long) (this.metrics.getBufferTotalBytes() - this.metrics.getBufferAvailablesBytes());
        logger.info("Buffer flush: {} bytes still in use", Long.valueOf(bufferTotalBytes));
        return bufferTotalBytes < 5;
    }
}
