package net.oneandone.reactive.sse.servlet;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import net.oneandone.reactive.sse.ServerSentEvent;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/oneandone/reactive/sse/servlet/SseWriteableChannel.class */
public class SseWriteableChannel {
    private static final int DEFAULT_KEEP_ALIVE_PERIOD_SEC = 30;
    private final List<CompletableFuture<Boolean>> whenWritePossibles;
    private final ServletOutputStream out;
    private final Consumer<Throwable> errorConsumer;

    /* loaded from: input_file:net/oneandone/reactive/sse/servlet/SseWriteableChannel$KeepAliveEmitter.class */
    private static final class KeepAliveEmitter {
        private static final ScheduledThreadPoolExecutor EXECUTOR = newScheduledThreadPoolExecutor();
        private final SseWriteableChannel channel;
        private final Duration keepAlivePeriod;

        private static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor() {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(0);
            scheduledThreadPoolExecutor.setKeepAliveTime(60L, TimeUnit.SECONDS);
            scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true);
            return scheduledThreadPoolExecutor;
        }

        public KeepAliveEmitter(SseWriteableChannel sseWriteableChannel, Duration duration) {
            this.channel = sseWriteableChannel;
            this.keepAlivePeriod = duration;
        }

        public void start() {
            scheduleNextKeepAliveEvent();
        }

        private void scheduleNextKeepAliveEvent() {
            this.channel.writeEventAsync(ServerSentEvent.newEvent().comment("keep alive")).thenAccept(num -> {
                EXECUTOR.schedule(() -> {
                    scheduleNextKeepAliveEvent();
                }, this.keepAlivePeriod.getSeconds(), TimeUnit.SECONDS);
            });
        }
    }

    /* loaded from: input_file:net/oneandone/reactive/sse/servlet/SseWriteableChannel$ServletWriteListener.class */
    private final class ServletWriteListener implements WriteListener {
        private ServletWriteListener() {
        }

        public void onWritePossible() throws IOException {
            synchronized (SseWriteableChannel.this.whenWritePossibles) {
                SseWriteableChannel.this.whenWritePossibles.forEach(completableFuture -> {
                    completableFuture.complete(null);
                });
                SseWriteableChannel.this.whenWritePossibles.clear();
            }
        }

        public void onError(Throwable th) {
            SseWriteableChannel.this.errorConsumer.accept(th);
        }
    }

    public SseWriteableChannel(ServletOutputStream servletOutputStream, Consumer<Throwable> consumer) {
        this(servletOutputStream, consumer, Duration.ofSeconds(30L));
    }

    public SseWriteableChannel(ServletOutputStream servletOutputStream, Consumer<Throwable> consumer, Duration duration) {
        this.whenWritePossibles = Lists.newArrayList();
        this.errorConsumer = consumer;
        this.out = servletOutputStream;
        servletOutputStream.setWriteListener(new ServletWriteListener());
        new KeepAliveEmitter(this, duration).start();
        requestWriteNotificationAsync().thenAccept(bool -> {
            flush();
        });
    }

    public CompletableFuture<Integer> writeEventAsync(ServerSentEvent serverSentEvent) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        requestWriteNotificationAsync().thenAccept(bool -> {
            writeToWrite(serverSentEvent, completableFuture);
        });
        return completableFuture;
    }

    private void writeToWrite(ServerSentEvent serverSentEvent, CompletableFuture<Integer> completableFuture) {
        try {
            synchronized (this.out) {
                byte[] bytes = serverSentEvent.toWire().getBytes("UTF-8");
                this.out.write(bytes);
                this.out.flush();
                completableFuture.complete(Integer.valueOf(bytes.length));
            }
        } catch (IOException | RuntimeException e) {
            this.errorConsumer.accept(e);
            completableFuture.completeExceptionally(e);
            close();
        }
    }

    private CompletableFuture<Boolean> requestWriteNotificationAsync() {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        synchronized (this.whenWritePossibles) {
            if (isWritePossible()) {
                completableFuture.complete(true);
            } else {
                this.whenWritePossibles.add(completableFuture);
            }
        }
        return completableFuture;
    }

    private boolean isWritePossible() {
        try {
            return this.out.isReady();
        } catch (IllegalStateException e) {
            return false;
        }
    }

    private void flush() {
        try {
            this.out.flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        try {
            this.out.close();
        } catch (IOException e) {
        }
    }
}
