package reactor.core.publisher;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.flow.MultiProducer;
import reactor.core.flow.Producer;
import reactor.core.flow.Receiver;
import reactor.core.queue.RingBuffer;
import reactor.core.queue.RingBufferReceiver;
import reactor.core.queue.Slot;
import reactor.core.state.Backpressurable;
import reactor.core.state.Cancellable;
import reactor.core.state.Completable;
import reactor.core.state.Introspectable;
import reactor.core.state.Requestable;
import reactor.core.util.BackpressureUtils;
import reactor.core.util.EmptySubscription;
import reactor.core.util.Exceptions;
import reactor.core.util.ExecutorUtils;
import reactor.core.util.Sequence;
import reactor.core.util.WaitStrategy;

/* loaded from: input_file:lib/reactor-core-2.5.0.M3.jar:reactor/core/publisher/TopicProcessor.class */
public final class TopicProcessor<E> extends EventLoopProcessor<E, E> implements Backpressurable, MultiProducer {
    final RingBufferReceiver barrier;
    final RingBuffer<Slot<E>> ringBuffer;
    final Sequence minimum;
    final WaitStrategy readWait;

    /* loaded from: input_file:lib/reactor-core-2.5.0.M3.jar:reactor/core/publisher/TopicProcessor$TopicSubscriberLoop.class */
    private static final class TopicSubscriberLoop<T> implements Runnable, Producer, Backpressurable, Completable, Receiver, Cancellable, Introspectable, Requestable, Subscription {
        private final TopicProcessor<T> processor;
        private final Sequence pendingRequest;
        private final Subscriber<? super T> subscriber;
        private final AtomicBoolean running = new AtomicBoolean(false);
        private final Sequence sequence = RingBuffer.wrap(-1, this);
        private final Runnable waiter = new Runnable() { // from class: reactor.core.publisher.TopicProcessor.TopicSubscriberLoop.1
            @Override // java.lang.Runnable
            public void run() {
                if (!TopicSubscriberLoop.this.running.get() || TopicSubscriberLoop.this.processor.isTerminated()) {
                    throw Exceptions.AlertException.INSTANCE;
                }
            }
        };

        public TopicSubscriberLoop(TopicProcessor<T> topicProcessor, Sequence sequence, Subscriber<? super T> subscriber) {
            this.processor = topicProcessor;
            this.pendingRequest = sequence;
            this.subscriber = subscriber;
        }

        public Sequence getSequence() {
            return this.sequence;
        }

        public void halt() {
            this.running.set(false);
            this.processor.barrier.alert();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!this.running.compareAndSet(false, true)) {
                    EmptySubscription.error(this.subscriber, new IllegalStateException("Thread is already running"));
                    this.processor.ringBuffer.removeGatingSequence(this.sequence);
                    this.processor.decrementSubscribers();
                    this.running.set(false);
                    this.processor.readWait.signalAllWhenBlocking();
                    return;
                }
                if (this.processor.startSubscriber(this.subscriber, this)) {
                    if (!RingBuffer.waitRequestOrTerminalEvent(this.pendingRequest, this.processor.barrier, this.running, this.sequence, this.waiter)) {
                        if (!this.running.get()) {
                            this.processor.ringBuffer.removeGatingSequence(this.sequence);
                            this.processor.decrementSubscribers();
                            this.running.set(false);
                            this.processor.readWait.signalAllWhenBlocking();
                            return;
                        }
                        if (this.processor.terminated == 1) {
                            if (this.processor.ringBuffer.getAsLong() == -1) {
                                if (this.processor.error != null) {
                                    this.subscriber.onError(this.processor.error);
                                    this.processor.ringBuffer.removeGatingSequence(this.sequence);
                                    this.processor.decrementSubscribers();
                                    this.running.set(false);
                                    this.processor.readWait.signalAllWhenBlocking();
                                    return;
                                }
                                this.subscriber.onComplete();
                                this.processor.ringBuffer.removeGatingSequence(this.sequence);
                                this.processor.decrementSubscribers();
                                this.running.set(false);
                                this.processor.readWait.signalAllWhenBlocking();
                                return;
                            }
                        } else if (this.processor.terminated == 2) {
                            this.processor.ringBuffer.removeGatingSequence(this.sequence);
                            this.processor.decrementSubscribers();
                            this.running.set(false);
                            this.processor.readWait.signalAllWhenBlocking();
                            return;
                        }
                    }
                    long asLong = this.sequence.getAsLong() + 1;
                    boolean z = this.pendingRequest.getAsLong() == Long.MAX_VALUE;
                    while (true) {
                        try {
                            long waitFor = this.processor.barrier.waitFor(asLong, this.waiter);
                            while (asLong <= waitFor) {
                                Slot<T> slot = this.processor.ringBuffer.get(asLong);
                                while (!z && BackpressureUtils.getAndSub(this.pendingRequest, 1L) == 0) {
                                    if (!this.running.get() || this.processor.isTerminated()) {
                                        throw Exceptions.AlertException.INSTANCE;
                                        break;
                                    }
                                    LockSupport.parkNanos(1L);
                                }
                                this.subscriber.onNext(slot.value);
                                asLong++;
                            }
                            this.sequence.set(waitFor);
                            if (EmptySubscription.INSTANCE != this.processor.upstreamSubscription) {
                                this.processor.readWait.signalAllWhenBlocking();
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        } catch (Exceptions.AlertException | Exceptions.CancelException e2) {
                            if (!this.running.get()) {
                                break;
                            }
                            if (this.processor.terminated != 1) {
                                if (this.processor.terminated == 2) {
                                    break;
                                }
                            } else if (this.processor.error != null) {
                                this.subscriber.onError(this.processor.error);
                                break;
                            } else {
                                if (asLong > this.processor.ringBuffer.getAsLong()) {
                                    this.subscriber.onComplete();
                                    break;
                                }
                                LockSupport.parkNanos(1L);
                            }
                            this.processor.barrier.clearAlert();
                        } catch (Throwable th) {
                            Exceptions.throwIfFatal(th);
                            this.subscriber.onError(th);
                            this.sequence.set(asLong);
                            asLong++;
                        }
                    }
                    this.processor.ringBuffer.removeGatingSequence(this.sequence);
                    this.processor.decrementSubscribers();
                    this.running.set(false);
                    this.processor.readWait.signalAllWhenBlocking();
                }
            } finally {
                this.processor.ringBuffer.removeGatingSequence(this.sequence);
                this.processor.decrementSubscribers();
                this.running.set(false);
                this.processor.readWait.signalAllWhenBlocking();
            }
        }

        @Override // reactor.core.state.Cancellable
        public boolean isCancelled() {
            return !this.running.get();
        }

        @Override // reactor.core.state.Completable
        public boolean isStarted() {
            return this.sequence.getAsLong() != -1;
        }

        @Override // reactor.core.state.Completable
        public boolean isTerminated() {
            return !this.running.get();
        }

        @Override // reactor.core.state.Requestable
        public long requestedFromDownstream() {
            return this.pendingRequest.getAsLong();
        }

        @Override // reactor.core.state.Backpressurable
        public long getPending() {
            return this.processor.ringBuffer.getCursor() - this.sequence.getAsLong();
        }

        @Override // reactor.core.state.Backpressurable
        public long getCapacity() {
            return this.processor.getCapacity();
        }

        @Override // reactor.core.flow.Producer
        public Object downstream() {
            return this.subscriber;
        }

        @Override // reactor.core.flow.Receiver
        public Object upstream() {
            return this.processor;
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (BackpressureUtils.checkRequest(j, this.subscriber) && this.running.get()) {
                BackpressureUtils.getAndAddCap(this.pendingRequest, j);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            halt();
        }

        @Override // reactor.core.state.Introspectable
        public int getMode() {
            return 2;
        }

        @Override // reactor.core.state.Introspectable
        public String getName() {
            return this.processor.getName() + "#loop";
        }
    }

    public static <E> TopicProcessor<E> create() {
        return create(TopicProcessor.class.getSimpleName(), 256, (WaitStrategy) null, true);
    }

    public static <E> TopicProcessor<E> create(String str) {
        return create(str, 256, true);
    }

    public static <E> TopicProcessor<E> create(boolean z) {
        return create(TopicProcessor.class.getSimpleName(), 256, (WaitStrategy) null, z);
    }

    public static <E> TopicProcessor<E> create(ExecutorService executorService) {
        return create(executorService, 256, (WaitStrategy) null, true);
    }

    public static <E> TopicProcessor<E> create(ExecutorService executorService, boolean z) {
        return create(executorService, 256, (WaitStrategy) null, z);
    }

    public static <E> TopicProcessor<E> create(String str, int i) {
        return create(str, i, (WaitStrategy) null, true);
    }

    public static <E> TopicProcessor<E> create(String str, int i, boolean z) {
        return create(str, i, (WaitStrategy) null, z);
    }

    public static <E> TopicProcessor<E> create(ExecutorService executorService, int i) {
        return create(executorService, i, WaitStrategy.liteBlocking(), true);
    }

    public static <E> TopicProcessor<E> create(ExecutorService executorService, int i, boolean z) {
        return create(executorService, i, WaitStrategy.liteBlocking(), z);
    }

    public static <E> TopicProcessor<E> create(String str, int i, WaitStrategy waitStrategy) {
        return create(str, i, waitStrategy, (Supplier) null);
    }

    public static <E> TopicProcessor<E> create(String str, int i, WaitStrategy waitStrategy, Supplier<E> supplier) {
        return new TopicProcessor<>(str, null, i, waitStrategy, false, true, supplier);
    }

    public static <E> TopicProcessor<E> create(String str, int i, WaitStrategy waitStrategy, boolean z) {
        return new TopicProcessor<>(str, null, i, waitStrategy, false, z, null);
    }

    public static <E> TopicProcessor<E> create(ExecutorService executorService, int i, WaitStrategy waitStrategy) {
        return create(executorService, i, waitStrategy, true);
    }

    public static <E> TopicProcessor<E> create(ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z) {
        return new TopicProcessor<>(null, executorService, i, waitStrategy, false, z, null);
    }

    public static <E> TopicProcessor<E> share() {
        return share(TopicProcessor.class.getSimpleName(), 256, (WaitStrategy) null, true);
    }

    public static <E> TopicProcessor<E> share(boolean z) {
        return share(TopicProcessor.class.getSimpleName(), 256, (WaitStrategy) null, z);
    }

    public static <E> TopicProcessor<E> share(ExecutorService executorService) {
        return share(executorService, 256, (WaitStrategy) null, true);
    }

    public static <E> TopicProcessor<E> share(ExecutorService executorService, boolean z) {
        return share(executorService, 256, (WaitStrategy) null, z);
    }

    public static <E> TopicProcessor<E> share(String str, int i) {
        return share(str, i, (WaitStrategy) null, true);
    }

    public static <E> TopicProcessor<E> share(String str, int i, boolean z) {
        return share(str, i, (WaitStrategy) null, z);
    }

    public static <E> TopicProcessor<E> share(ExecutorService executorService, int i) {
        return share(executorService, i, (WaitStrategy) null, true);
    }

    public static <E> TopicProcessor<E> share(ExecutorService executorService, int i, boolean z) {
        return share(executorService, i, (WaitStrategy) null, z);
    }

    public static <E> TopicProcessor<E> share(String str, int i, WaitStrategy waitStrategy) {
        return new TopicProcessor<>(str, null, i, waitStrategy, true, true, null);
    }

    public static <E> TopicProcessor<E> share(String str, int i, Supplier<E> supplier) {
        return new TopicProcessor<>(str, null, i, null, true, true, supplier);
    }

    public static <E> TopicProcessor<E> share(String str, int i, WaitStrategy waitStrategy, Supplier<E> supplier) {
        return new TopicProcessor<>(str, null, i, waitStrategy, true, true, supplier);
    }

    public static <E> TopicProcessor<E> share(String str, int i, WaitStrategy waitStrategy, boolean z) {
        return new TopicProcessor<>(str, null, i, waitStrategy, true, z, null);
    }

    public static <E> TopicProcessor<E> share(ExecutorService executorService, int i, WaitStrategy waitStrategy) {
        return share(executorService, i, waitStrategy, true);
    }

    public static <E> TopicProcessor<E> share(ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z) {
        return new TopicProcessor<>(null, executorService, i, waitStrategy, true, z, null);
    }

    private TopicProcessor(String str, ExecutorService executorService, int i, WaitStrategy waitStrategy, boolean z, boolean z2, Supplier<E> supplier) {
        super(str, executorService, z2);
        this.readWait = WaitStrategy.liteBlocking();
        if (!RingBuffer.isPowerOfTwo(i)) {
            throw new IllegalArgumentException("bufferSize must be a power of 2 : " + i);
        }
        Supplier supplier2 = () -> {
            Slot slot = new Slot();
            if (supplier != null) {
                slot.value = supplier.get();
            }
            return slot;
        };
        Runnable runnable = () -> {
            if (!alive() && SUBSCRIBER_COUNT.get(this) == 0) {
                throw Exceptions.AlertException.INSTANCE;
            }
        };
        WaitStrategy phasedOffLiteLock = waitStrategy == null ? WaitStrategy.phasedOffLiteLock(200L, 100L, TimeUnit.MILLISECONDS) : waitStrategy;
        if (z) {
            this.ringBuffer = RingBuffer.createMultiProducer(supplier2, i, phasedOffLiteLock, runnable);
        } else {
            this.ringBuffer = RingBuffer.createSingleProducer(supplier2, i, phasedOffLiteLock, runnable);
        }
        this.minimum = RingBuffer.newSequence(-1L);
        this.barrier = this.ringBuffer.newBarrier();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.publisher.FluxProcessor, org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super E> subscriber) {
        super.subscribe(subscriber);
        if (!alive()) {
            coldSource(this.ringBuffer, null, this.error, this.minimum).subscribe(subscriber);
            return;
        }
        TopicSubscriberLoop topicSubscriberLoop = new TopicSubscriberLoop(this, RingBuffer.newSequence(0L), subscriber);
        if (incrementSubscribers()) {
            topicSubscriberLoop.sequence.set(this.minimum.getAsLong());
            this.ringBuffer.addGatingSequence(topicSubscriberLoop.sequence);
        } else {
            topicSubscriberLoop.sequence.set(this.ringBuffer.getCursor());
            this.ringBuffer.addGatingSequence(topicSubscriberLoop.sequence);
        }
        try {
            this.executor.execute(topicSubscriberLoop);
        } catch (Throwable th) {
            this.ringBuffer.removeGatingSequence(topicSubscriberLoop.getSequence());
            decrementSubscribers();
            if (alive() || !RejectedExecutionException.class.isAssignableFrom(th.getClass())) {
                EmptySubscription.error(subscriber, th);
            } else {
                coldSource(this.ringBuffer, th, this.error, this.minimum).subscribe(subscriber);
            }
        }
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    public Flux<E> drain() {
        return coldSource(this.ringBuffer, null, this.error, this.minimum);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(E e) {
        super.onNext(e);
        RingBuffer.onNext(e, this.ringBuffer);
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void doError(Throwable th) {
        this.readWait.signalAllWhenBlocking();
        this.barrier.signal();
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void doComplete() {
        this.readWait.signalAllWhenBlocking();
        this.barrier.signal();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <E> Flux<E> coldSource(RingBuffer<Slot<E>> ringBuffer, Throwable th, Throwable th2, Sequence sequence) {
        Flux<E> fromIterable = fromIterable(RingBuffer.nonBlockingBoundedQueue(ringBuffer, sequence.getAsLong()));
        if (th2 == null) {
            return fromIterable;
        }
        if (th == null) {
            return concat(fromIterable, Flux.error(th2));
        }
        th.addSuppressed(th2);
        return concat(fromIterable, Flux.error(th));
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    public boolean isWork() {
        return false;
    }

    @Override // reactor.core.state.Backpressurable
    public long getPending() {
        return this.ringBuffer.getPending();
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void requestTask(Subscription subscription) {
        this.minimum.set(this.ringBuffer.getCursor());
        this.ringBuffer.addGatingSequence(this.minimum);
        ThreadFactory newNamedFactory = ExecutorUtils.newNamedFactory(this.name + "[request-task]", null, null, false);
        Runnable runnable = () -> {
            if (alive()) {
                return;
            }
            if (!this.cancelled) {
                throw Exceptions.AlertException.INSTANCE;
            }
            throw Exceptions.CancelException.INSTANCE;
        };
        Sequence sequence = this.minimum;
        sequence.getClass();
        newNamedFactory.newThread(RingBuffer.createRequestTask(subscription, runnable, (v1) -> {
            r3.set(v1);
        }, () -> {
            return SUBSCRIBER_COUNT.get(this) == 0 ? this.minimum.getAsLong() : this.ringBuffer.getMinimumGatingSequence(this.minimum);
        }, this.readWait, this, (int) this.ringBuffer.getCapacity())).start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.core.publisher.EventLoopProcessor, reactor.core.publisher.FluxProcessor
    public void cancel(Subscription subscription) {
        super.cancel(subscription);
        this.readWait.signalAllWhenBlocking();
    }

    public String toString() {
        return "TopicProcessor{barrier=" + this.barrier + ", remaining=" + this.ringBuffer.remainingCapacity() + '}';
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    public long getAvailableCapacity() {
        return this.ringBuffer.remainingCapacity();
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.state.Backpressurable
    public long getCapacity() {
        return this.ringBuffer.getCapacity();
    }

    @Override // reactor.core.publisher.EventLoopProcessor, reactor.core.state.Completable
    public boolean isStarted() {
        return super.isStarted() || this.ringBuffer.getAsLong() != -1;
    }

    @Override // reactor.core.flow.MultiProducer
    public Iterator<?> downstreams() {
        return Arrays.asList(this.ringBuffer.getSequenceReceivers()).iterator();
    }

    @Override // reactor.core.flow.MultiProducer
    public long downstreamCount() {
        return this.ringBuffer.getSequenceReceivers().length - (isStarted() ? 1 : 0);
    }
}
