package reactor.core.publisher;

import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.flow.Producer;
import reactor.core.flow.Receiver;
import reactor.core.state.Backpressurable;
import reactor.core.state.Completable;
import reactor.core.subscriber.BaseSubscriber;
import reactor.core.subscriber.SignalEmitter;
import reactor.core.util.BackpressureUtils;
import reactor.core.util.EmptySubscription;
import reactor.core.util.Exceptions;

/* loaded from: input_file:lib/reactor-core-2.5.0.M3.jar:reactor/core/publisher/FluxProcessor.class */
public abstract class FluxProcessor<IN, OUT> extends Flux<OUT> implements Processor<IN, OUT>, Backpressurable, Receiver, Completable, BaseSubscriber<IN> {
    Subscription upstreamSubscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/reactor-core-2.5.0.M3.jar:reactor/core/publisher/FluxProcessor$DelegateProcessor.class */
    public static final class DelegateProcessor<IN, OUT> extends FluxProcessor<IN, OUT> implements Producer, Backpressurable {
        private final Publisher<OUT> downstream;
        private final Subscriber<IN> upstream;

        public DelegateProcessor(Publisher<OUT> publisher, Subscriber<IN> subscriber) {
            this.downstream = (Publisher) Objects.requireNonNull(publisher, "Downstream must not be null");
            this.upstream = (Subscriber) Objects.requireNonNull(subscriber, "Upstream must not be null");
        }

        @Override // reactor.core.flow.Producer
        public Subscriber<? super IN> downstream() {
            return this.upstream;
        }

        @Override // reactor.core.publisher.FluxProcessor, reactor.core.state.Backpressurable
        public long getCapacity() {
            if (Backpressurable.class.isAssignableFrom(this.upstream.getClass())) {
                return ((Backpressurable) this.upstream).getCapacity();
            }
            return Long.MAX_VALUE;
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.upstream.onComplete();
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.upstream.onError(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(IN in) {
            this.upstream.onNext(in);
        }

        @Override // reactor.core.publisher.FluxProcessor, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.upstream.onSubscribe(subscription);
        }

        @Override // reactor.core.publisher.FluxProcessor, org.reactivestreams.Publisher
        public void subscribe(Subscriber<? super OUT> subscriber) {
            if (subscriber == null) {
                throw Exceptions.argumentIsNullException();
            }
            this.downstream.subscribe(subscriber);
        }

        @Override // reactor.core.publisher.FluxProcessor, reactor.core.flow.Receiver
        public /* bridge */ /* synthetic */ Object upstream() {
            return super.upstream();
        }

        @Override // reactor.core.publisher.FluxProcessor, reactor.core.subscriber.BaseSubscriber
        public /* bridge */ /* synthetic */ BaseSubscriber connect() {
            return super.connect();
        }
    }

    public static <IN, OUT, E extends Subscriber<IN>> FluxProcessor<IN, OUT> blackbox(E e, Function<E, ? extends Publisher<OUT>> function) {
        return create(e, function.apply(e));
    }

    public static <IN, OUT> FluxProcessor<IN, OUT> blackbox(Function<Flux<IN>, ? extends Publisher<OUT>> function) {
        FluxPassthrough fluxPassthrough = new FluxPassthrough();
        return create(fluxPassthrough, (Publisher) function.apply(fluxPassthrough));
    }

    public static <IN> FluxProcessor<IN, IN> blocking() {
        FluxPassthrough fluxPassthrough = new FluxPassthrough();
        return create(SignalEmitter.blocking(fluxPassthrough), fluxPassthrough);
    }

    public static <IN, OUT> FluxProcessor<IN, OUT> create(Subscriber<IN> subscriber, Publisher<OUT> publisher) {
        return new DelegateProcessor(publisher, subscriber);
    }

    public FluxProcessor<IN, OUT> connect() {
        onSubscribe(EmptySubscription.INSTANCE);
        return this;
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (BackpressureUtils.validate(this.upstreamSubscription, subscription)) {
            this.upstreamSubscription = subscription;
            try {
                doOnSubscribe(subscription);
            } catch (Throwable th) {
                Exceptions.throwIfFatal(th);
                subscription.cancel();
                onError(th);
            }
        }
    }

    protected void doOnSubscribe(Subscription subscription) {
    }

    public long getCapacity() {
        return Long.MAX_VALUE;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super OUT> subscriber) {
        if (subscriber == null) {
            throw Exceptions.argumentIsNullException();
        }
    }

    protected void cancel(Subscription subscription) {
        if (subscription != EmptySubscription.INSTANCE) {
            subscription.cancel();
        }
    }

    @Override // reactor.core.flow.Receiver
    public Subscription upstream() {
        return this.upstreamSubscription;
    }

    @Override // reactor.core.publisher.Flux, reactor.core.state.Introspectable
    public int getMode() {
        return 0;
    }
}
