package reactor.core.publisher;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.flow.Loopback;
import reactor.core.subscriber.MultiSubscriptionSubscriber;
import reactor.core.subscriber.Subscribers;
import reactor.core.util.DeferredSubscription;
import reactor.core.util.EmptySubscription;
import reactor.core.util.Exceptions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/reactor-core-2.5.0.M3.jar:reactor/core/publisher/FluxRetryWhen.class */
public final class FluxRetryWhen<T> extends FluxSource<T, T> {
    final Function<? super Flux<Throwable>, ? extends Publisher<? extends Object>> whenSourceFactory;

    /* loaded from: input_file:lib/reactor-core-2.5.0.M3.jar:reactor/core/publisher/FluxRetryWhen$RetryWhenMainSubscriber.class */
    static final class RetryWhenMainSubscriber<T> extends MultiSubscriptionSubscriber<T, T> {
        final DeferredSubscription otherArbiter;
        final Subscriber<Throwable> signaller;
        final Publisher<? extends T> source;
        volatile int wip;
        static final AtomicIntegerFieldUpdater<RetryWhenMainSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(RetryWhenMainSubscriber.class, "wip");
        volatile boolean cancelled;
        long produced;

        public RetryWhenMainSubscriber(Subscriber<? super T> subscriber, Subscriber<Throwable> subscriber2, Publisher<? extends T> publisher) {
            super(subscriber);
            this.signaller = subscriber2;
            this.source = publisher;
            this.otherArbiter = new DeferredSubscription();
        }

        @Override // reactor.core.subscriber.MultiSubscriptionSubscriber, org.reactivestreams.Subscription
        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            cancelWhen();
            super.cancel();
        }

        void cancelWhen() {
            this.otherArbiter.cancel();
        }

        public void setWhen(Subscription subscription) {
            this.otherArbiter.set(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            this.subscriber.onNext(t);
            this.produced++;
        }

        @Override // reactor.core.subscriber.MultiSubscriptionSubscriber, org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            long j = this.produced;
            if (j != 0) {
                this.produced = 0L;
                produced(j);
            }
            this.otherArbiter.request(1L);
            this.signaller.onNext(th);
        }

        @Override // reactor.core.subscriber.MultiSubscriptionSubscriber, org.reactivestreams.Subscriber
        public void onComplete() {
            this.otherArbiter.cancel();
            this.subscriber.onComplete();
        }

        void resubscribe() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            while (!this.cancelled) {
                this.source.subscribe(this);
                if (WIP.decrementAndGet(this) == 0) {
                    return;
                }
            }
        }

        void whenError(Throwable th) {
            this.cancelled = true;
            super.cancel();
            this.subscriber.onError(th);
        }

        void whenComplete() {
            this.cancelled = true;
            super.cancel();
            this.subscriber.onComplete();
        }
    }

    /* loaded from: input_file:lib/reactor-core-2.5.0.M3.jar:reactor/core/publisher/FluxRetryWhen$RetryWhenOtherSubscriber.class */
    static final class RetryWhenOtherSubscriber extends Flux<Throwable> implements Subscriber<Object>, Loopback {
        RetryWhenMainSubscriber<?> main;
        final EmitterProcessor<Throwable> completionSignal = EmitterProcessor.create();

        RetryWhenOtherSubscriber() {
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.main.setWhen(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Object obj) {
            this.main.resubscribe();
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.main.whenError(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.main.whenComplete();
        }

        @Override // org.reactivestreams.Publisher
        public void subscribe(Subscriber<? super Throwable> subscriber) {
            this.completionSignal.subscribe(subscriber);
        }

        @Override // reactor.core.flow.Loopback
        public Object connectedInput() {
            return this.main;
        }

        @Override // reactor.core.flow.Loopback
        public Object connectedOutput() {
            return this.completionSignal;
        }

        @Override // reactor.core.publisher.Flux, reactor.core.state.Introspectable
        public int getMode() {
            return 3;
        }
    }

    public FluxRetryWhen(Publisher<? extends T> publisher, Function<? super Flux<Throwable>, ? extends Publisher<? extends Object>> function) {
        super(publisher);
        this.whenSourceFactory = (Function) Objects.requireNonNull(function, "whenSourceFactory");
    }

    @Override // reactor.core.publisher.FluxSource, reactor.core.state.Backpressurable
    public long getCapacity() {
        return -1L;
    }

    @Override // reactor.core.publisher.FluxSource, org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        RetryWhenOtherSubscriber retryWhenOtherSubscriber = new RetryWhenOtherSubscriber();
        Subscriber serialize = Subscribers.serialize(retryWhenOtherSubscriber.completionSignal);
        serialize.onSubscribe(EmptySubscription.INSTANCE);
        Subscriber serialize2 = Subscribers.serialize(subscriber);
        RetryWhenMainSubscriber<?> retryWhenMainSubscriber = new RetryWhenMainSubscriber<>(serialize2, serialize, this.source);
        retryWhenOtherSubscriber.main = retryWhenMainSubscriber;
        serialize2.onSubscribe(retryWhenMainSubscriber);
        try {
            Publisher<? extends Object> apply = this.whenSourceFactory.apply(retryWhenOtherSubscriber);
            if (apply == null) {
                subscriber.onError(new NullPointerException("The whenSourceFactory returned a null Publisher"));
                return;
            }
            apply.subscribe(retryWhenOtherSubscriber);
            if (retryWhenMainSubscriber.cancelled) {
                return;
            }
            this.source.subscribe(retryWhenMainSubscriber);
        } catch (Throwable th) {
            Exceptions.throwIfFatal(th);
            subscriber.onError(Exceptions.unwrap(th));
        }
    }
}
