package org.springframework.integration.endpoint;

import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.ChannelUtils;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
import org.springframework.integration.router.MessageRouter;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.5.16.jar:org/springframework/integration/endpoint/ReactiveStreamsConsumer.class */
public class ReactiveStreamsConsumer extends AbstractEndpoint implements IntegrationConsumer {
    private final MessageChannel inputChannel;
    private final Publisher<Message<Object>> publisher;
    private final MessageHandler handler;

    @Nullable
    private final ReactiveMessageHandler reactiveMessageHandler;

    @Nullable
    private final Subscriber<Message<?>> subscriber;

    @Nullable
    private final Lifecycle lifecycleDelegate;

    @Nullable
    private Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> reactiveCustomizer;
    private ErrorHandler errorHandler;
    private volatile Disposable subscription;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.5.16.jar:org/springframework/integration/endpoint/ReactiveStreamsConsumer$MessageHandlerSubscriber.class */
    private static final class MessageHandlerSubscriber implements CoreSubscriber<Message<?>>, Disposable, Lifecycle {
        private final Consumer<Message<?>> consumer;
        private Subscription subscription;
        private final MessageHandler messageHandler;

        MessageHandlerSubscriber(MessageHandler messageHandler) {
            Assert.notNull(messageHandler, "'messageHandler' must not be null");
            this.messageHandler = messageHandler;
            MessageHandler messageHandler2 = this.messageHandler;
            Objects.requireNonNull(messageHandler2);
            this.consumer = messageHandler2::handleMessage;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(Long.MAX_VALUE);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Message<?> message) {
            this.consumer.accept(message);
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            dispose();
        }

        @Override // reactor.core.Disposable
        public void dispose() {
            Subscription subscription = this.subscription;
            if (subscription != null) {
                this.subscription = null;
                subscription.cancel();
            }
        }

        @Override // reactor.core.Disposable
        public boolean isDisposed() {
            return this.subscription == null;
        }

        @Override // org.springframework.context.Lifecycle
        public void start() {
            if (this.messageHandler instanceof Lifecycle) {
                ((Lifecycle) this.messageHandler).start();
            }
        }

        @Override // org.springframework.context.Lifecycle
        public void stop() {
            if (this.messageHandler instanceof Lifecycle) {
                ((Lifecycle) this.messageHandler).stop();
            }
        }

        @Override // org.springframework.context.Lifecycle
        public boolean isRunning() {
            return !(this.messageHandler instanceof Lifecycle) || ((Lifecycle) this.messageHandler).isRunning();
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.5.16.jar:org/springframework/integration/endpoint/ReactiveStreamsConsumer$SubscriberDecorator.class */
    private static final class SubscriberDecorator extends BaseSubscriber<Message<?>> {
        private final Subscriber<Message<?>> delegate;
        private final ErrorHandler errorHandler;

        SubscriberDecorator(Subscriber<Message<?>> subscriber, ErrorHandler errorHandler) {
            this.delegate = subscriber;
            this.errorHandler = errorHandler;
        }

        @Override // reactor.core.publisher.BaseSubscriber
        protected void hookOnSubscribe(Subscription subscription) {
            this.delegate.onSubscribe(subscription);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.core.publisher.BaseSubscriber
        public void hookOnNext(Message<?> message) {
            try {
                this.delegate.onNext(message);
            } catch (Exception e) {
                this.errorHandler.handleError(e);
            }
        }

        @Override // reactor.core.publisher.BaseSubscriber
        protected void hookOnComplete() {
            this.delegate.onComplete();
        }
    }

    public ReactiveStreamsConsumer(MessageChannel messageChannel, MessageHandler messageHandler) {
        this(messageChannel, (Subscriber<Message<?>>) (messageHandler instanceof Subscriber ? (Subscriber) messageHandler : new MessageHandlerSubscriber(messageHandler)));
    }

    public ReactiveStreamsConsumer(MessageChannel messageChannel, Subscriber<Message<?>> subscriber) {
        Assert.notNull(messageChannel, "'inputChannel' must not be null");
        Assert.notNull(subscriber, "'subscriber' must not be null");
        this.inputChannel = messageChannel;
        if (messageChannel instanceof NullChannel) {
            this.logger.warn("The consuming from the NullChannel does not have any effects: it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
        }
        this.publisher = IntegrationReactiveUtils.messageChannelToFlux(messageChannel);
        this.subscriber = subscriber;
        this.lifecycleDelegate = subscriber instanceof Lifecycle ? (Lifecycle) subscriber : null;
        if (subscriber instanceof MessageHandlerSubscriber) {
            this.handler = ((MessageHandlerSubscriber) subscriber).messageHandler;
        } else if (subscriber instanceof MessageHandler) {
            this.handler = (MessageHandler) subscriber;
        } else {
            Subscriber<Message<?>> subscriber2 = this.subscriber;
            Objects.requireNonNull(subscriber2);
            this.handler = (v1) -> {
                r1.onNext(v1);
            };
        }
        this.reactiveMessageHandler = null;
    }

    public ReactiveStreamsConsumer(MessageChannel messageChannel, ReactiveMessageHandler reactiveMessageHandler) {
        Assert.notNull(messageChannel, "'inputChannel' must not be null");
        this.inputChannel = messageChannel;
        this.handler = new ReactiveMessageHandlerAdapter(reactiveMessageHandler);
        this.reactiveMessageHandler = reactiveMessageHandler;
        this.publisher = IntegrationReactiveUtils.messageChannelToFlux(messageChannel);
        this.subscriber = null;
        this.lifecycleDelegate = reactiveMessageHandler instanceof Lifecycle ? (Lifecycle) reactiveMessageHandler : null;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setReactiveCustomizer(@Nullable Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> function) {
        this.reactiveCustomizer = function;
    }

    @Override // org.springframework.integration.endpoint.IntegrationConsumer
    public MessageChannel getInputChannel() {
        return this.inputChannel;
    }

    @Override // org.springframework.integration.endpoint.IntegrationConsumer
    public MessageChannel getOutputChannel() {
        if (this.handler instanceof MessageProducer) {
            return ((MessageProducer) this.handler).getOutputChannel();
        }
        if (this.handler instanceof MessageRouter) {
            return ((MessageRouter) this.handler).getDefaultOutputChannel();
        }
        return null;
    }

    @Override // org.springframework.integration.endpoint.IntegrationConsumer
    public MessageHandler getHandler() {
        return this.handler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        if (this.errorHandler == null) {
            this.errorHandler = ChannelUtils.getErrorHandler(getBeanFactory());
        }
    }

    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        if (this.lifecycleDelegate != null) {
            this.lifecycleDelegate.start();
        }
        Flux from = Flux.from(this.publisher);
        if (this.reactiveCustomizer != null) {
            from = from.transform(this.reactiveCustomizer);
        }
        if (this.reactiveMessageHandler != null) {
            ReactiveMessageHandler reactiveMessageHandler = this.reactiveMessageHandler;
            Objects.requireNonNull(reactiveMessageHandler);
            this.subscription = from.flatMap(reactiveMessageHandler::handleMessage).onErrorContinue((th, obj) -> {
                this.errorHandler.handleError(th);
            }).subscribe();
        } else if (this.subscriber != null) {
            this.subscription = (Disposable) from.subscribeWith(new SubscriberDecorator(this.subscriber, this.errorHandler));
        }
    }

    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStop() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
        if (this.lifecycleDelegate != null) {
            this.lifecycleDelegate.stop();
        }
    }
}
