package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.ArrayList;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;

/* loaded from: input_file:activemq-core-1.1-G1M3.jar:org/codehaus/activemq/service/impl/SubscriptionImpl.class */
public class SubscriptionImpl implements Subscription {
    private static final Log log;
    private String clientId;
    private String subscriberName;
    private ActiveMQDestination destination;
    private String selector;
    private int prefetchLimit;
    private boolean noLocal;
    private boolean active;
    private int consumerNumber;
    private String consumerId;
    private boolean browser;
    protected Dispatcher dispatch;
    protected String brokerName;
    protected String clusterName;
    private MessageIdentity lastMessageIdentity;
    Filter filter;
    protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
    QueueList messagePtrs = new DefaultQueueList();
    private boolean usePrefetch = false;
    private SubscriberEntry subscriberEntry;
    private ConsumerInfo activeConsumer;
    private BrokerClient activeClient;
    private RedeliveryPolicy redeliveryPolicy;
    static Class class$org$codehaus$activemq$service$impl$SubscriptionImpl;

    public SubscriptionImpl(Dispatcher dispatcher, BrokerClient brokerClient, ConsumerInfo consumerInfo, Filter filter, RedeliveryPolicy redeliveryPolicy) {
        this.dispatch = dispatcher;
        this.filter = filter;
        this.redeliveryPolicy = redeliveryPolicy;
        setActiveConsumer(brokerClient, consumerInfo);
    }

    @Override // org.codehaus.activemq.service.Subscription
    public void setActiveConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) {
        BrokerConnector brokerConnector;
        BrokerInfo brokerInfo;
        if (consumerInfo != null) {
            this.clientId = consumerInfo.getClientId();
            this.subscriberName = consumerInfo.getConsumerName();
            this.noLocal = consumerInfo.isNoLocal();
            this.destination = consumerInfo.getDestination();
            this.selector = consumerInfo.getSelector();
            this.prefetchLimit = consumerInfo.getPrefetchNumber();
            this.consumerNumber = consumerInfo.getConsumerNo();
            this.consumerId = consumerInfo.getConsumerId();
            this.browser = consumerInfo.isBrowser();
        }
        this.activeClient = brokerClient;
        this.activeConsumer = consumerInfo;
        if (brokerClient == null || (brokerConnector = brokerClient.getBrokerConnector()) == null || (brokerInfo = brokerConnector.getBrokerInfo()) == null) {
            return;
        }
        this.brokerName = brokerInfo.getBrokerName();
        this.clusterName = brokerInfo.getClusterName();
    }

    public String toString() {
        return new StringBuffer().append("SubscriptionImpl(").append(super.hashCode()).append(")[").append(this.consumerId).append("]").append(this.clientId).append(": ").append(this.subscriberName).append(" : ").append(this.destination).toString();
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized void clear() throws JMSException {
        QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
        while (true) {
            QueueListEntry queueListEntry = firstEntry;
            if (queueListEntry == null) {
                this.messagePtrs.clear();
                return;
            } else {
                ((MessagePointer) queueListEntry.getElement()).clear();
                firstEntry = this.messagePtrs.getNextEntry(queueListEntry);
            }
        }
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized void reset() throws JMSException {
        QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
        while (true) {
            QueueListEntry queueListEntry = firstEntry;
            if (queueListEntry == null) {
                return;
            }
            MessagePointer messagePointer = (MessagePointer) queueListEntry.getElement();
            if (!messagePointer.isDispatched()) {
                return;
            }
            messagePointer.reset();
            messagePointer.setRedelivered(true);
            firstEntry = this.messagePtrs.getNextEntry(queueListEntry);
        }
    }

    @Override // org.codehaus.activemq.service.Subscription
    public String getClientId() {
        return this.clientId;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public Filter getFilter() {
        return this.filter;
    }

    public void setFilter(Filter filter) {
        this.filter = filter;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public boolean isWildcard() {
        return this.filter.isWildcard();
    }

    @Override // org.codehaus.activemq.service.Subscription
    public String getPersistentKey() {
        return null;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public boolean isSameDurableSubscription(ConsumerInfo consumerInfo) throws JMSException {
        return isDurableTopic() && equal(this.clientId, consumerInfo.getClientId()) && equal(this.subscriberName, consumerInfo.getConsumerName());
    }

    public boolean isNoLocal() {
        return this.noLocal;
    }

    public void setNoLocal(boolean z) {
        this.noLocal = z;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public String getSubscriberName() {
        return this.subscriberName;
    }

    public void setSubscriberName(String str) {
        this.subscriberName = str;
    }

    public RedeliveryPolicy getRedeliveryPolicy() {
        return this.redeliveryPolicy;
    }

    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
        this.redeliveryPolicy = redeliveryPolicy;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public boolean isTarget(ActiveMQMessage activeMQMessage) throws JMSException {
        boolean z = false;
        if (activeMQMessage != null && (this.activeClient == null || this.brokerName == null || this.clusterName == null || !this.activeClient.isClusteredConnection() || !activeMQMessage.isEntryCluster(this.clusterName) || activeMQMessage.isEntryBroker(this.brokerName))) {
            z = this.filter.matches(activeMQMessage);
            if (this.noLocal && z && clientIDsEqual(activeMQMessage)) {
                z = false;
            }
        }
        return z;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized void addMessage(MessageContainer messageContainer, ActiveMQMessage activeMQMessage) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Adding to subscription: ").append(this).append(" message: ").append(activeMQMessage).toString());
        }
        this.messagePtrs.add(new MessagePointer(messageContainer, activeMQMessage.getJMSMessageIdentity()));
        this.dispatch.wakeup(this);
        this.lastMessageIdentity = activeMQMessage.getJMSMessageIdentity();
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized void messageConsumed(MessageAck messageAck) throws JMSException {
        doMessageConsume(messageAck, true);
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized void onAcknowledgeTransactedMessageBeforeCommit(MessageAck messageAck) throws JMSException {
        doMessageConsume(messageAck, false);
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized void redeliverMessage(MessageContainer messageContainer, MessageAck messageAck) throws JMSException {
        QueueListEntry queueListEntry;
        MessagePointer messagePointer;
        QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
        while (true) {
            queueListEntry = firstEntry;
            if (queueListEntry == null || ((MessagePointer) queueListEntry.getElement()).getMessageIdentity().getMessageID().equals(messageAck.getMessageID())) {
                break;
            } else {
                firstEntry = this.messagePtrs.getNextEntry(queueListEntry);
            }
        }
        if (queueListEntry == null || (messagePointer = (MessagePointer) queueListEntry.getElement()) == null) {
            return;
        }
        this.unconsumedMessagesDispatched.increment();
        messagePointer.reset();
        messagePointer.setRedelivered(true);
        this.dispatch.wakeup(this);
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
        if (this.usePrefetch) {
            return getMessagesWithPrefetch();
        }
        ArrayList arrayList = new ArrayList();
        QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
        while (true) {
            QueueListEntry queueListEntry = firstEntry;
            if (queueListEntry == null) {
                return (ActiveMQMessage[]) arrayList.toArray(new ActiveMQMessage[arrayList.size()]);
            }
            MessagePointer messagePointer = (MessagePointer) queueListEntry.getElement();
            if (!messagePointer.isDispatched()) {
                ActiveMQMessage message = messagePointer.getContainer().getMessage(messagePointer.getMessageIdentity());
                if (message != null) {
                    if (messagePointer.isDispatched() || messagePointer.isRedelivered()) {
                        message.setJMSRedelivered(true);
                    }
                    messagePointer.setDispatched(true);
                    arrayList.add(message);
                } else {
                    log.info(new StringBuffer().append("Message probably expired: ").append(message).toString());
                    queueListEntry = this.messagePtrs.getPrevEntry(queueListEntry);
                    this.messagePtrs.remove(queueListEntry);
                }
            }
            firstEntry = this.messagePtrs.getNextEntry(queueListEntry);
        }
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized SubscriberEntry getSubscriptionEntry() {
        if (this.subscriberEntry == null) {
            this.subscriberEntry = createSubscriptionEntry();
        }
        return this.subscriberEntry;
    }

    protected SubscriberEntry createSubscriptionEntry() {
        SubscriberEntry subscriberEntry = new SubscriberEntry();
        subscriberEntry.setClientID(this.clientId);
        subscriberEntry.setConsumerName(this.subscriberName);
        subscriberEntry.setDestination(this.destination.getPhysicalName());
        subscriberEntry.setSelector(this.selector);
        return subscriberEntry;
    }

    protected synchronized ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
        ArrayList arrayList = new ArrayList();
        QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
        int i = 0;
        int i2 = this.prefetchLimit - this.unconsumedMessagesDispatched.get();
        while (firstEntry != null && i < i2) {
            MessagePointer messagePointer = (MessagePointer) firstEntry.getElement();
            if (!messagePointer.isDispatched()) {
                ActiveMQMessage message = messagePointer.getContainer().getMessage(messagePointer.getMessageIdentity());
                if (message != null) {
                    if (messagePointer.isDispatched() || messagePointer.isRedelivered()) {
                        message.setJMSRedelivered(true);
                    }
                    messagePointer.setDispatched(true);
                    arrayList.add(message);
                    this.unconsumedMessagesDispatched.increment();
                    i++;
                } else {
                    log.info(new StringBuffer().append("Message probably expired: ").append(message).toString());
                    QueueListEntry queueListEntry = firstEntry;
                    firstEntry = this.messagePtrs.getPrevEntry(queueListEntry);
                    this.messagePtrs.remove(queueListEntry);
                }
            }
            firstEntry = this.messagePtrs.getNextEntry(firstEntry);
        }
        return (ActiveMQMessage[]) arrayList.toArray(new ActiveMQMessage[arrayList.size()]);
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized boolean isAtPrefetchLimit() throws JMSException {
        return this.usePrefetch && this.messagePtrs.size() - this.unconsumedMessagesDispatched.get() >= this.prefetchLimit;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized boolean isReadyToDispatch() throws JMSException {
        return this.active && this.messagePtrs.size() > 0;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public String getSelector() {
        return this.selector;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized boolean isActive() {
        return this.active;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public synchronized void setActive(boolean z) throws JMSException {
        this.active = z;
        if (z) {
            return;
        }
        reset();
    }

    @Override // org.codehaus.activemq.service.Subscription
    public int getConsumerNumber() {
        return this.consumerNumber;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public String getConsumerId() {
        return this.consumerId;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public boolean isDurableTopic() throws JMSException {
        return this.destination.isTopic() && this.subscriberName != null && this.subscriberName.length() > 0;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public boolean isBrowser() throws JMSException {
        return this.browser;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public MessageIdentity getLastMessageIdentity() throws JMSException {
        return this.lastMessageIdentity;
    }

    @Override // org.codehaus.activemq.service.Subscription
    public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
        this.lastMessageIdentity = messageIdentity;
    }

    protected synchronized void doMessageConsume(MessageAck messageAck, boolean z) throws JMSException {
        int i = 0;
        boolean z2 = false;
        QueueListEntry firstEntry = this.messagePtrs.getFirstEntry();
        while (true) {
            QueueListEntry queueListEntry = firstEntry;
            if (queueListEntry == null) {
                break;
            }
            MessagePointer messagePointer = (MessagePointer) queueListEntry.getElement();
            if (z) {
                this.messagePtrs.remove(queueListEntry);
                if (messageAck.isMessageRead() && !this.browser) {
                    messagePointer.delete(messageAck);
                }
            }
            i++;
            if (z && !messageAck.isPartOfTransaction()) {
                this.unconsumedMessagesDispatched.decrement();
            }
            if (messagePointer.getMessageIdentity().equals(messageAck.getMessageIdentity())) {
                if (!z && messageAck.isPartOfTransaction()) {
                    this.unconsumedMessagesDispatched.decrement();
                }
                z2 = true;
            } else {
                firstEntry = this.messagePtrs.getNextEntry(queueListEntry);
            }
        }
        if (!z2 && log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Did not find a matching message for identity: ").append(messageAck.getMessageIdentity()).toString());
        }
        this.dispatch.wakeup(this);
    }

    protected boolean clientIDsEqual(ActiveMQMessage activeMQMessage) {
        String jMSClientID = activeMQMessage.getJMSClientID();
        String producerID = activeMQMessage.getProducerID();
        String str = this.clientId;
        if (producerID != null && producerID.equals(str)) {
            return true;
        }
        if (jMSClientID == null || str == null) {
            return false;
        }
        return jMSClientID.equals(str);
    }

    protected static final boolean equal(Object obj, Object obj2) {
        return obj == obj2 || !(obj == null || obj2 == null || !obj.equals(obj2));
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$service$impl$SubscriptionImpl == null) {
            cls = class$("org.codehaus.activemq.service.impl.SubscriptionImpl");
            class$org$codehaus$activemq$service$impl$SubscriptionImpl = cls;
        } else {
            cls = class$org$codehaus$activemq$service$impl$SubscriptionImpl;
        }
        log = LogFactory.getLog(cls);
    }
}
