package org.springframework.cloud.stream.annotation.rxjava;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/* loaded from: input_file:org/springframework/cloud/stream/annotation/rxjava/SubjectMessageHandler.class */
public class SubjectMessageHandler extends AbstractMessageProducingHandler implements DisposableBean {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final RxJavaProcessor processor;
    private final Subject subject;
    private final Subscription subscription;

    public SubjectMessageHandler(RxJavaProcessor rxJavaProcessor) {
        Assert.notNull(rxJavaProcessor, "RxJava processor must not be null.");
        this.processor = rxJavaProcessor;
        this.subject = new SerializedSubject(PublishSubject.create());
        this.subscription = rxJavaProcessor.process(this.subject).subscribe(new Action1<Object>() { // from class: org.springframework.cloud.stream.annotation.rxjava.SubjectMessageHandler.1
            public void call(Object obj) {
                if (ClassUtils.isAssignable(Message.class, obj.getClass())) {
                    SubjectMessageHandler.this.getOutputChannel().send((Message) obj);
                } else {
                    SubjectMessageHandler.this.getOutputChannel().send(MessageBuilder.withPayload(obj).build());
                }
            }
        }, new Action1<Throwable>() { // from class: org.springframework.cloud.stream.annotation.rxjava.SubjectMessageHandler.2
            public void call(Throwable th) {
                SubjectMessageHandler.this.logger.error(th.getMessage(), th);
            }
        }, new Action0() { // from class: org.springframework.cloud.stream.annotation.rxjava.SubjectMessageHandler.3
            public void call() {
                SubjectMessageHandler.this.logger.error("Subscription close for [" + SubjectMessageHandler.this.subscription + "]");
            }
        });
    }

    protected void handleMessageInternal(Message<?> message) throws Exception {
        this.subject.onNext(message.getPayload());
    }

    public void destroy() throws Exception {
        this.subscription.unsubscribe();
    }
}
