package org.activemq.broker.region;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.activemq.broker.ConnectionContext;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQQueue;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.command.MessageId;
import org.activemq.transaction.Synchronization;

/* loaded from: input_file:activemq-core-4.0-M1.jar:org/activemq/broker/region/PrefetchSubscription.class */
public abstract class PrefetchSubscription extends AbstractSubscription {
    protected final LinkedList matched;
    protected final LinkedList dispatched;
    protected final ActiveMQDestination dlqDestination;
    protected int delivered;
    int preLoadLimit;
    int preLoadSize;
    boolean dispatching;

    public PrefetchSubscription(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws InvalidSelectorException {
        super(connectionContext, consumerInfo);
        this.matched = new LinkedList();
        this.dispatched = new LinkedList();
        this.dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
        this.delivered = 0;
        this.preLoadLimit = 102400;
        this.preLoadSize = 0;
        this.dispatching = false;
    }

    @Override // org.activemq.broker.region.Subscription
    public synchronized void add(MessageReference messageReference) throws Throwable {
        if (isFull()) {
            this.matched.addLast(messageReference);
        } else {
            dispatch(messageReference);
        }
    }

    @Override // org.activemq.broker.region.Subscription
    public synchronized void acknowledge(ConnectionContext connectionContext, MessageAck messageAck) throws Throwable {
        boolean isFull = isFull();
        if (messageAck.isStandardAck()) {
            int i = 0;
            boolean z = false;
            Iterator it = this.dispatched.iterator();
            while (it.hasNext()) {
                MessageReference messageReference = (MessageReference) it.next();
                MessageId messageId = messageReference.getMessageId();
                if (messageAck.getFirstMessageId() == null || messageAck.getFirstMessageId().equals(messageId)) {
                    z = true;
                }
                if (z) {
                    if (connectionContext.isInTransaction()) {
                        connectionContext.getTransaction().addSynchronization(new Synchronization(this, messageAck) { // from class: org.activemq.broker.region.PrefetchSubscription.1
                            private final MessageAck val$ack;
                            private final PrefetchSubscription this$0;

                            {
                                this.this$0 = this;
                                this.val$ack = messageAck;
                            }

                            @Override // org.activemq.transaction.Synchronization
                            public void afterCommit() throws Throwable {
                                synchronized (this.this$0) {
                                    boolean z2 = false;
                                    int i2 = 0;
                                    Iterator it2 = this.this$0.dispatched.iterator();
                                    while (it2.hasNext()) {
                                        MessageId messageId2 = ((MessageReference) it2.next()).getMessageId();
                                        if (this.val$ack.getFirstMessageId() == null || this.val$ack.getFirstMessageId().equals(messageId2)) {
                                            z2 = true;
                                        }
                                        if (z2) {
                                            i2++;
                                            it2.remove();
                                            if (this.val$ack.getLastMessageId().equals(messageId2)) {
                                                this.this$0.delivered = Math.max(0, this.this$0.delivered - (i2 + 1));
                                                return;
                                            }
                                        }
                                    }
                                }
                            }
                        });
                    } else {
                        it.remove();
                    }
                    i++;
                    acknowledge(connectionContext, messageAck, messageReference);
                    if (messageAck.getLastMessageId().equals(messageId)) {
                        if (connectionContext.isInTransaction()) {
                            this.delivered = Math.max(this.delivered, i + 1);
                        } else {
                            this.delivered = Math.max(0, this.delivered - (i + 1));
                        }
                        if (!isFull || isFull()) {
                            return;
                        }
                        dispatchMatched();
                        return;
                    }
                }
            }
            throw new JMSException(new StringBuffer().append("Could not correlate acknowledgment with dispatched message: ").append(messageAck).toString());
        }
        if (messageAck.isDeliveredAck()) {
            int i2 = 0;
            Iterator it2 = this.dispatched.iterator();
            while (it2.hasNext()) {
                if (messageAck.getLastMessageId().equals(((MessageReference) it2.next()).getMessageId())) {
                    this.delivered = Math.max(this.delivered, i2 + 1);
                    if (!isFull || isFull()) {
                        return;
                    }
                    dispatchMatched();
                    return;
                }
                i2++;
            }
            throw new JMSException(new StringBuffer().append("Could not correlate acknowledgment with dispatched message: ").append(messageAck).toString());
        }
        if (!messageAck.isPoisonAck()) {
            throw new JMSException(new StringBuffer().append("Invalid acknowledgment: ").append(messageAck).toString());
        }
        if (messageAck.isInTransaction()) {
            throw new JMSException(new StringBuffer().append("Poison ack cannot be transacted: ").append(messageAck).toString());
        }
        int i3 = 0;
        boolean z2 = false;
        Iterator it3 = this.dispatched.iterator();
        while (it3.hasNext()) {
            MessageReference messageReference2 = (MessageReference) it3.next();
            MessageId messageId2 = messageReference2.getMessageId();
            if (messageAck.getFirstMessageId() == null || messageAck.getFirstMessageId().equals(messageId2)) {
                z2 = true;
            }
            if (z2) {
                messageReference2.incrementReferenceCount();
                try {
                    Message message = messageReference2.getMessage();
                    if (message != null) {
                        message.setDestination(this.dlqDestination);
                        message.setProducerId(message.getMessageId().getProducerId());
                        message.setTransactionId(null);
                        boolean isProducerFlowControl = connectionContext.isProducerFlowControl();
                        try {
                            connectionContext.setProducerFlowControl(false);
                            connectionContext.getBroker().send(connectionContext, message);
                            connectionContext.setProducerFlowControl(isProducerFlowControl);
                        } catch (Throwable th) {
                            connectionContext.setProducerFlowControl(isProducerFlowControl);
                            throw th;
                        }
                    }
                    it3.remove();
                    i3++;
                    acknowledge(connectionContext, messageAck, messageReference2);
                    if (messageAck.getLastMessageId().equals(messageId2)) {
                        this.delivered = Math.max(0, this.delivered - (i3 + 1));
                        if (!isFull || isFull()) {
                            return;
                        }
                        dispatchMatched();
                        return;
                    }
                } finally {
                    messageReference2.decrementReferenceCount();
                }
            }
        }
        throw new JMSException(new StringBuffer().append("Could not correlate acknowledgment with dispatched message: ").append(messageAck).toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isFull() {
        return this.dispatched.size() - this.delivered >= this.info.getPrefetchSize() || this.preLoadSize > this.preLoadLimit;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatchMatched() throws IOException {
        if (this.dispatching) {
            return;
        }
        this.dispatching = true;
        try {
            Iterator it = this.matched.iterator();
            while (it.hasNext() && !isFull()) {
                MessageReference messageReference = (MessageReference) it.next();
                it.remove();
                dispatch(messageReference);
            }
        } finally {
            this.dispatching = false;
        }
    }

    private void dispatch(MessageReference messageReference) throws IOException {
        messageReference.incrementReferenceCount();
        Message message = messageReference.getMessage();
        if (message == null) {
            return;
        }
        incrementPreloadSize(messageReference.getMessage().getSize());
        if (canDispatch(messageReference)) {
            MessageDispatch createMessageDispatch = createMessageDispatch(messageReference, message);
            this.dispatched.addLast(messageReference);
            if (this.info.isDispatchAsync()) {
                createMessageDispatch.setConsumer(new Runnable(this, messageReference, message) { // from class: org.activemq.broker.region.PrefetchSubscription.2
                    private final MessageReference val$node;
                    private final Message val$message;
                    private final PrefetchSubscription this$0;

                    {
                        this.this$0 = this;
                        this.val$node = messageReference;
                        this.val$message = message;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        this.this$0.onDispatch(this.val$node, this.val$message);
                    }
                });
                this.context.getConnection().dispatchAsync(createMessageDispatch);
            } else {
                this.context.getConnection().dispatchSync(createMessageDispatch);
                onDispatch(messageReference, message);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void onDispatch(MessageReference messageReference, Message message) {
        boolean isFull = isFull();
        decrementPreloadSize(message.getSize());
        messageReference.decrementReferenceCount();
        if (messageReference.getRegionDestination() != null) {
            messageReference.getRegionDestination().getDestinationStatistics().getDequeues().increment();
            if (!isFull || isFull()) {
                return;
            }
            try {
                dispatchMatched();
            } catch (IOException e) {
                this.context.getConnection().serviceException(e);
            }
        }
    }

    private int incrementPreloadSize(int i) {
        this.preLoadSize += i;
        return this.preLoadSize;
    }

    private int decrementPreloadSize(int i) {
        this.preLoadSize -= i;
        return this.preLoadSize;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageDispatch createMessageDispatch(MessageReference messageReference, Message message) {
        MessageDispatch messageDispatch = new MessageDispatch();
        messageDispatch.setConsumerId(this.info.getConsumerId());
        messageDispatch.setDestination(messageReference.getRegionDestination().getActiveMQDestination());
        messageDispatch.setMessage(message);
        messageDispatch.setRedeliveryCounter(messageReference.getRedeliveryCounter());
        return messageDispatch;
    }

    protected abstract boolean canDispatch(MessageReference messageReference);

    protected void acknowledge(ConnectionContext connectionContext, MessageAck messageAck, MessageReference messageReference) throws IOException {
    }
}
