package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.DestinationMap;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageContainerManager;

/* loaded from: input_file:activemq-core-1.1-G1M3.jar:org/codehaus/activemq/service/boundedvm/TransientTopicBoundedMessageManager.class */
public class TransientTopicBoundedMessageManager implements MessageContainerManager {
    private MemoryBoundedQueueManager queueManager;
    private ConcurrentHashMap containers = new ConcurrentHashMap();
    private DestinationMap destinationMap = new DestinationMap();
    private Map destinations = new ConcurrentHashMap();
    private FilterFactory filterFactory = new FilterFactoryImpl();
    private SynchronizedBoolean started = new SynchronizedBoolean(false);

    public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager memoryBoundedQueueManager) {
        this.queueManager = memoryBoundedQueueManager;
    }

    @Override // org.codehaus.activemq.service.Service
    public void start() throws JMSException {
        if (this.started.commit(false, true)) {
            Iterator it = this.containers.values().iterator();
            while (it.hasNext()) {
                ((TransientTopicBoundedMessageContainer) it.next()).start();
            }
        }
    }

    @Override // org.codehaus.activemq.service.Service
    public void stop() throws JMSException {
        if (this.started.commit(true, false)) {
            Iterator it = this.containers.values().iterator();
            while (it.hasNext()) {
                ((TransientTopicBoundedMessageContainer) it.next()).stop();
            }
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public synchronized void addMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (destination.isTopic()) {
            TransientTopicBoundedMessageContainer transientTopicBoundedMessageContainer = (TransientTopicBoundedMessageContainer) this.containers.get(brokerClient);
            if (transientTopicBoundedMessageContainer == null) {
                transientTopicBoundedMessageContainer = new TransientTopicBoundedMessageContainer(brokerClient, this.queueManager.getMemoryBoundedQueue(brokerClient.toString()));
                this.containers.put(brokerClient, transientTopicBoundedMessageContainer);
                if (this.started.get()) {
                    transientTopicBoundedMessageContainer.start();
                }
            }
            transientTopicBoundedMessageContainer.addConsumer(createFilter(consumerInfo), consumerInfo);
            this.destinationMap.put(destination, transientTopicBoundedMessageContainer);
            String physicalName = destination.getPhysicalName();
            if (this.destinations.containsKey(physicalName)) {
                return;
            }
            this.destinations.put(physicalName, destination);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public synchronized void removeMessageConsumer(BrokerClient brokerClient, ConsumerInfo consumerInfo) throws JMSException {
        TransientTopicBoundedMessageContainer transientTopicBoundedMessageContainer;
        ActiveMQDestination destination = consumerInfo.getDestination();
        if (!destination.isTopic() || (transientTopicBoundedMessageContainer = (TransientTopicBoundedMessageContainer) this.containers.get(brokerClient)) == null) {
            return;
        }
        transientTopicBoundedMessageContainer.removeConsumer(consumerInfo);
        if (transientTopicBoundedMessageContainer.isInactive()) {
            this.containers.remove(brokerClient);
            transientTopicBoundedMessageContainer.close();
            this.destinationMap.remove(destination, transientTopicBoundedMessageContainer);
        }
        if (hasConsumerFor(destination)) {
            return;
        }
        this.destinations.remove(destination.getPhysicalName());
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void deleteSubscription(String str, String str2) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void sendMessage(BrokerClient brokerClient, ActiveMQMessage activeMQMessage) throws JMSException {
        if (activeMQMessage == null || !activeMQMessage.getJMSActiveMQDestination().isTopic()) {
            return;
        }
        Iterator it = this.destinationMap.get(activeMQMessage.getJMSActiveMQDestination()).iterator();
        while (it.hasNext()) {
            ((TransientTopicBoundedMessageContainer) it.next()).targetAndDispatch(brokerClient, activeMQMessage);
        }
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void acknowledgeTransactedMessage(BrokerClient brokerClient, String str, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void redeliverMessage(BrokerClient brokerClient, MessageAck messageAck) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void poll() throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void commitTransaction(BrokerClient brokerClient, String str) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void rollbackTransaction(BrokerClient brokerClient, String str) {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public MessageContainer getContainer(String str) throws JMSException {
        return null;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public Map getDestinations() {
        return Collections.unmodifiableMap(this.destinations);
    }

    protected Filter createFilter(ConsumerInfo consumerInfo) throws JMSException {
        Filter createFilter = this.filterFactory.createFilter(consumerInfo.getDestination(), consumerInfo.getSelector());
        if (consumerInfo.isNoLocal()) {
            createFilter = new AndFilter(createFilter, new NoLocalFilter(consumerInfo.getClientId()));
        }
        return createFilter;
    }

    protected boolean hasConsumerFor(ActiveMQDestination activeMQDestination) {
        Iterator it = this.containers.values().iterator();
        while (it.hasNext()) {
            if (((TransientTopicBoundedMessageContainer) it.next()).hasConsumerFor(activeMQDestination)) {
                return true;
            }
        }
        return false;
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void createMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public void destroyMessageContainer(ActiveMQDestination activeMQDestination) throws JMSException {
    }

    @Override // org.codehaus.activemq.service.MessageContainerManager
    public Map getMessageContainerAdmins() throws JMSException {
        return Collections.EMPTY_MAP;
    }
}
