package org.codehaus.activemq.ra;

import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.ServerSessionPool;
import javax.jms.Topic;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.transaction.xa.XAResource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnectionConsumer;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQTopic;

/* loaded from: input_file:activemq-ra-1.1-G1M3.jar:org/codehaus/activemq/ra/ActiveMQPollingEndpointWorker.class */
public class ActiveMQPollingEndpointWorker extends ActiveMQBaseEndpointWorker implements Work {
    private static final Log log;
    private static final int MAX_WORKERS = 10;
    private SynchronizedBoolean started;
    private SynchronizedBoolean stopping;
    private Latch stopLatch;
    private ActiveMQConnectionConsumer consumer;
    private CircularQueue workers;
    static WorkListener debugingWorkListener;
    static Class class$org$codehaus$activemq$ra$ActiveMQPollingEndpointWorker;

    public ActiveMQPollingEndpointWorker(ActiveMQResourceAdapter activeMQResourceAdapter, ActiveMQEndpointActivationKey activeMQEndpointActivationKey) throws ResourceException {
        super(activeMQResourceAdapter, activeMQEndpointActivationKey);
        this.started = new SynchronizedBoolean(false);
        this.stopping = new SynchronizedBoolean(false);
        this.stopLatch = new Latch();
    }

    @Override // org.codehaus.activemq.ra.ActiveMQBaseEndpointWorker
    public void start() throws WorkException, ResourceException {
        Queue activeMQTopic;
        ActiveMQActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();
        try {
            try {
                this.workers = new CircularQueue(10, this.stopping);
                for (int i = 0; i < this.workers.size(); i++) {
                    XASession createSession = createSession();
                    XAResource xAResource = null;
                    if (createSession instanceof XASession) {
                        if (!this.transacted) {
                            throw new ResourceException("You cannot use an XA Connection with a non transacted endpoint.");
                        }
                        xAResource = createSession.getXAResource();
                    }
                    this.workers.returnObject(new InboundEndpointWork(createSession, this.endpointFactory.createEndpoint(xAResource), this.workers));
                }
                if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
                    activeMQTopic = new ActiveMQQueue(activationSpec.getDestination());
                } else {
                    if (!"javax.jms.Topic".equals(activationSpec.getDestinationType())) {
                        throw new ResourceException(new StringBuffer().append("Unknown destination type: ").append(activationSpec.getDestinationType()).toString());
                    }
                    activeMQTopic = new ActiveMQTopic(activationSpec.getDestination());
                }
                if (emptyToNull(activationSpec.getSubscriptionName()) != null) {
                    this.consumer = (ActiveMQConnectionConsumer) getPhysicalConnection().createDurableConnectionConsumer((Topic) activeMQTopic, activationSpec.getSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), (ServerSessionPool) null, 0);
                } else {
                    this.consumer = (ActiveMQConnectionConsumer) getPhysicalConnection().createConnectionConsumer(activeMQTopic, emptyToNull(activationSpec.getMessageSelector()), (ServerSessionPool) null, 0);
                }
                log.debug("Started");
                this.workManager.scheduleWork(this, Long.MAX_VALUE, (ExecutionContext) null, debugingWorkListener);
                if (1 == 0) {
                    safeClose(this.consumer);
                }
            } catch (JMSException e) {
                throw new ResourceException("Could not start the endpoint.", e);
            }
        } catch (Throwable th) {
            if (0 == 0) {
                safeClose(this.consumer);
            }
            throw th;
        }
    }

    private String emptyToNull(String str) {
        if ("".equals(str)) {
            return null;
        }
        return str;
    }

    @Override // org.codehaus.activemq.ra.ActiveMQBaseEndpointWorker
    public void stop() throws InterruptedException {
        this.stopping.set(true);
        this.workers.notifyWaiting();
        if (this.started.compareTo(true) == 0) {
            this.stopLatch.acquire();
        }
        safeClose(this.consumer);
    }

    public void release() {
    }

    public void run() {
        this.started.set(true);
        while (!this.stopping.get()) {
            try {
                try {
                    ActiveMQMessage receive = this.consumer.receive(500L);
                    if (receive != null) {
                        InboundEndpointWork inboundEndpointWork = (InboundEndpointWork) this.workers.get();
                        if (inboundEndpointWork == null) {
                            break;
                        }
                        inboundEndpointWork.setMessage(receive);
                        this.workManager.scheduleWork(inboundEndpointWork, Long.MAX_VALUE, (ExecutionContext) null, debugingWorkListener);
                    }
                } catch (Throwable th) {
                    log.info("dispatcher: ", th);
                    this.stopLatch.release();
                    return;
                }
            } catch (Throwable th2) {
                this.stopLatch.release();
                throw th2;
            }
        }
        this.workers.drain();
        this.stopLatch.release();
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$ra$ActiveMQPollingEndpointWorker == null) {
            cls = class$("org.codehaus.activemq.ra.ActiveMQPollingEndpointWorker");
            class$org$codehaus$activemq$ra$ActiveMQPollingEndpointWorker = cls;
        } else {
            cls = class$org$codehaus$activemq$ra$ActiveMQPollingEndpointWorker;
        }
        log = LogFactory.getLog(cls);
        debugingWorkListener = new WorkListener() { // from class: org.codehaus.activemq.ra.ActiveMQPollingEndpointWorker.1
            public void workAccepted(WorkEvent workEvent) {
            }

            public void workRejected(WorkEvent workEvent) {
                ActiveMQPollingEndpointWorker.log.warn(new StringBuffer().append("Work rejected: ").append(workEvent).toString(), workEvent.getException());
            }

            public void workStarted(WorkEvent workEvent) {
            }

            public void workCompleted(WorkEvent workEvent) {
            }
        };
    }
}
