package org.mule.rx.support;

import org.apache.commons.lang.Validate;
import org.mule.api.MuleException;
import org.mule.api.callback.SourceCallback;
import org.mule.api.source.MessageSource;
import org.mule.security.oauth.processor.AbstractListeningMessageProcessor;
import rx.functions.Action1;

/* loaded from: input_file:org/mule/rx/support/SourceCallbackMessageSourceAdapter.class */
public class SourceCallbackMessageSourceAdapter extends AbstractListeningMessageProcessor implements MessageSource {
    private final Action1<SourceCallback> sourceCallbackConsumer;
    private volatile Thread subscriberThread;

    public SourceCallbackMessageSourceAdapter(Action1<SourceCallback> action1, Object obj) {
        super(WorkManagerCache.guessConnectorName(obj));
        Validate.notNull(action1, "sourceCallbackConsumer can't be null");
        this.sourceCallbackConsumer = action1;
    }

    public void start() throws MuleException {
        this.subscriberThread = new Thread(new Runnable() { // from class: org.mule.rx.support.SourceCallbackMessageSourceAdapter.1
            @Override // java.lang.Runnable
            public void run() {
                SourceCallbackMessageSourceAdapter.this.sourceCallbackConsumer.call(SourceCallbackMessageSourceAdapter.this);
            }
        }, "SourceCallbackSubscriber-" + getFlowConstruct().getName());
        this.subscriberThread.start();
    }

    public void stop() throws MuleException {
        if (this.subscriberThread != null) {
            this.subscriberThread.interrupt();
            this.subscriberThread = null;
        }
    }
}
