package net.oneandone.reactive.sse.servlet;

import com.google.common.collect.Queues;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletInputStream;
import net.oneandone.reactive.sse.ServerSentEvent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher.class */
class ServletSsePublisher implements Publisher<ServerSentEvent> {
    private boolean subscribed = false;
    private final ServletInputStream inputStream;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription.class */
    public static class SEEEventReaderSubscription implements Subscription {
        private final SubscriberNotifier subscriberNotifier;
        private final SseReadableChannel channel;

        /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$OnComplete.class */
        private class OnComplete extends SubscriberNotifier.TerminatingNotification {
            private OnComplete() {
                super();
            }

            @Override // net.oneandone.reactive.sse.servlet.ServletSsePublisher.SEEEventReaderSubscription.SubscriberNotifier.Notification
            public void signalTo(Subscriber<? super ServerSentEvent> subscriber) {
                subscriber.onComplete();
            }
        }

        /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$OnError.class */
        private class OnError extends SubscriberNotifier.TerminatingNotification {
            private final Throwable error;

            public OnError(Throwable th) {
                super();
                this.error = th;
            }

            @Override // net.oneandone.reactive.sse.servlet.ServletSsePublisher.SEEEventReaderSubscription.SubscriberNotifier.Notification
            public void signalTo(Subscriber<? super ServerSentEvent> subscriber) {
                subscriber.onError(this.error);
            }
        }

        /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$OnNext.class */
        private class OnNext extends SubscriberNotifier.Notification {
            private final ServerSentEvent event;

            public OnNext(ServerSentEvent serverSentEvent) {
                super();
                this.event = serverSentEvent;
            }

            @Override // net.oneandone.reactive.sse.servlet.ServletSsePublisher.SEEEventReaderSubscription.SubscriberNotifier.Notification
            public void signalTo(Subscriber<? super ServerSentEvent> subscriber) {
                subscriber.onNext(this.event);
            }
        }

        /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$OnSubscribe.class */
        private class OnSubscribe extends SubscriberNotifier.Notification {
            private OnSubscribe() {
                super();
            }

            @Override // net.oneandone.reactive.sse.servlet.ServletSsePublisher.SEEEventReaderSubscription.SubscriberNotifier.Notification
            public void signalTo(Subscriber<? super ServerSentEvent> subscriber) {
                subscriber.onSubscribe(SEEEventReaderSubscription.this);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$SubscriberNotifier.class */
        public static final class SubscriberNotifier implements Runnable {
            private final ConcurrentLinkedQueue<Notification> notifications = Queues.newConcurrentLinkedQueue();
            private final AtomicBoolean isOpen = new AtomicBoolean(true);
            private final Subscriber<? super ServerSentEvent> subscriber;

            /* JADX INFO: Access modifiers changed from: private */
            /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$SubscriberNotifier$Notification.class */
            public static abstract class Notification {
                private Notification() {
                }

                abstract void signalTo(Subscriber<? super ServerSentEvent> subscriber);

                boolean isTerminating() {
                    return false;
                }
            }

            /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$SubscriberNotifier$TerminatingNotification.class */
            private static abstract class TerminatingNotification extends Notification {
                private TerminatingNotification() {
                    super();
                }

                @Override // net.oneandone.reactive.sse.servlet.ServletSsePublisher.SEEEventReaderSubscription.SubscriberNotifier.Notification
                boolean isTerminating() {
                    return true;
                }
            }

            public SubscriberNotifier(Subscriber<? super ServerSentEvent> subscriber) {
                this.subscriber = subscriber;
            }

            private void close() {
                this.isOpen.set(false);
                this.notifications.clear();
            }

            public void emitNotification(Notification notification) {
                if (this.isOpen.get() && this.notifications.offer(notification)) {
                    tryScheduleToExecute();
                }
            }

            private final void tryScheduleToExecute() {
                try {
                    ForkJoinPool.commonPool().execute(this);
                } catch (Throwable th) {
                    close();
                    this.subscriber.onError(th);
                }
            }

            /* JADX WARN: Finally extract failed */
            @Override // java.lang.Runnable
            public final void run() {
                if (this.isOpen.get()) {
                    synchronized (this.subscriber) {
                        try {
                            Notification poll = this.notifications.poll();
                            if (poll != null) {
                                if (poll.isTerminating()) {
                                    close();
                                }
                                poll.signalTo(this.subscriber);
                            }
                            if (!this.notifications.isEmpty()) {
                                tryScheduleToExecute();
                            }
                        } catch (Throwable th) {
                            if (!this.notifications.isEmpty()) {
                                tryScheduleToExecute();
                            }
                            throw th;
                        }
                    }
                }
            }
        }

        public SEEEventReaderSubscription(ServletInputStream servletInputStream, Subscriber<? super ServerSentEvent> subscriber) {
            this.subscriberNotifier = new SubscriberNotifier(subscriber);
            this.channel = new SseReadableChannel(servletInputStream, serverSentEvent -> {
                emitNotification(new OnNext(serverSentEvent));
            }, th -> {
                emitNotification(new OnError(th));
            }, r7 -> {
                emitNotification(new OnComplete());
            });
            this.subscriberNotifier.emitNotification(new OnSubscribe());
        }

        private void emitNotification(SubscriberNotifier.Notification notification) {
            this.subscriberNotifier.emitNotification(notification);
        }

        public void cancel() {
            this.channel.close();
        }

        public void request(long j) {
            if (j <= 0) {
                this.subscriberNotifier.emitNotification(new OnError(new IllegalArgumentException("Non-negative number of elements must be requested: https://github.com/reactive-streams/reactive-streams#3.9")));
            } else {
                this.channel.request(j);
            }
        }
    }

    public ServletSsePublisher(ServletInputStream servletInputStream) {
        this.inputStream = servletInputStream;
    }

    public void subscribe(Subscriber<? super ServerSentEvent> subscriber) {
        synchronized (this) {
            if (this.subscribed) {
                subscriber.onError(new IllegalStateException("subscription already exists. Multi-subscribe is not supported"));
            } else {
                this.subscribed = true;
                subscriber.onSubscribe(new SEEEventReaderSubscription(this.inputStream, subscriber));
            }
        }
    }
}
