package group.insyde.statefun.tsukuyomi.core.capture;

import io.undertow.Undertow;
import io.undertow.UndertowOptions;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.net.ServerSocket;
import java.util.Collections;
import java.util.function.Consumer;
import org.apache.flink.statefun.sdk.java.StatefulFunctions;
import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
import org.apache.flink.statefun.sdk.java.slice.Slice;
import org.apache.flink.statefun.sdk.java.slice.Slices;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:group/insyde/statefun/tsukuyomi/core/capture/UndertowStatefunServer.class */
public class UndertowStatefunServer implements StatefunServer {
    private final int port;
    private Undertow server;
    private UndertowHttpHandler handler;

    /* loaded from: input_file:group/insyde/statefun/tsukuyomi/core/capture/UndertowStatefunServer$UndertowHttpHandler.class */
    private static final class UndertowHttpHandler implements HttpHandler {
        private static final Logger log = LoggerFactory.getLogger(UndertowHttpHandler.class);
        private final RequestReplyHandler handler;
        private Consumer<Throwable> uncaughtExceptionHandler;

        public void handleRequest(HttpServerExchange httpServerExchange) {
            httpServerExchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
        }

        private void onRequestBody(HttpServerExchange httpServerExchange, byte[] bArr) {
            httpServerExchange.dispatch();
            this.handler.handle(Slices.wrap(bArr)).whenComplete((slice, th) -> {
                onComplete(httpServerExchange, slice, th);
            });
        }

        private void onComplete(HttpServerExchange httpServerExchange, Slice slice, Throwable th) {
            if (th == null) {
                httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/octet-stream");
                httpServerExchange.getResponseSender().send(slice.asReadOnlyByteBuffer());
            } else {
                log.error("Function invocation failed", th);
                httpServerExchange.getResponseHeaders().put(Headers.STATUS, 500L);
                httpServerExchange.endExchange();
                this.uncaughtExceptionHandler.accept(th);
            }
        }

        public UndertowHttpHandler(RequestReplyHandler requestReplyHandler) {
            this.handler = requestReplyHandler;
        }

        public void setUncaughtExceptionHandler(Consumer<Throwable> consumer) {
            this.uncaughtExceptionHandler = consumer;
        }
    }

    public static UndertowStatefunServer useAvailablePort() {
        ServerSocket serverSocket = new ServerSocket(0);
        try {
            int localPort = serverSocket.getLocalPort();
            serverSocket.close();
            return new UndertowStatefunServer(localPort);
        } finally {
            if (Collections.singletonList(serverSocket).get(0) != null) {
                serverSocket.close();
            }
        }
    }

    @Override // group.insyde.statefun.tsukuyomi.core.capture.StatefunServer
    public void start(StatefulFunctions statefulFunctions) {
        this.handler = new UndertowHttpHandler(statefulFunctions.requestReplyHandler());
        this.server = Undertow.builder().addHttpListener(this.port, "0.0.0.0").setHandler(this.handler).setServerOption(UndertowOptions.ENABLE_HTTP2, true).build();
        this.server.start();
    }

    @Override // group.insyde.statefun.tsukuyomi.core.capture.StatefunServer
    public void stop() {
        if (this.server != null) {
            this.server.stop();
        }
    }

    @Override // group.insyde.statefun.tsukuyomi.core.capture.StatefunServer
    public void setUncaughtExceptionHandler(Consumer<Throwable> consumer) {
        if (this.handler == null) {
            throw new StatefunServerNotStartedException("UncaughtExceptionHandler cannot be set until the server is not started");
        }
        this.handler.setUncaughtExceptionHandler(consumer);
    }

    private UndertowStatefunServer(int i) {
        this.port = i;
    }

    @Override // group.insyde.statefun.tsukuyomi.core.capture.StatefunServer
    public int getPort() {
        return this.port;
    }
}
