package org.activemq.transport;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.ActiveMQPrefetchPolicy;
import org.activemq.advisories.ConnectionAdvisor;
import org.activemq.advisories.ConnectionAdvisoryEvent;
import org.activemq.advisories.ConnectionAdvisoryEventListener;
import org.activemq.broker.BrokerClient;
import org.activemq.broker.BrokerContainer;
import org.activemq.broker.ConsumerInfoListener;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.BrokerInfo;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.Receipt;
import org.activemq.service.MessageContainerManager;
import org.activemq.service.Service;
import org.activemq.transport.composite.CompositeTransportChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:activemq-core-3.2.2.jar:org/activemq/transport/NetworkChannel.class */
public class NetworkChannel implements Service, ConsumerInfoListener, ConnectionAdvisoryEventListener, TransportStatusEventListener {
    private static final Log log;
    protected String uri;
    protected BrokerContainer brokerContainer;
    protected ActiveMQConnection localConnection;
    protected ActiveMQConnection remoteConnection;
    protected ConcurrentHashMap topicConsumerMap;
    protected ConcurrentHashMap queueConsumerMap;
    protected String remoteUserName;
    protected String remotePassword;
    protected String remoteBrokerName;
    protected String remoteClusterName;
    protected int maximumRetries;
    protected long reconnectSleepTime;
    protected PooledExecutor threadPool;
    private boolean remote;
    private SynchronizedBoolean started;
    private SynchronizedBoolean connected;
    private SynchronizedBoolean stopped;
    private ConnectionAdvisor connectionAdvisor;
    private ActiveMQPrefetchPolicy localPrefetchPolicy;
    private ActiveMQPrefetchPolicy remotePrefetchPolicy;
    private boolean demandBasedForwarding;
    static Class class$org$activemq$transport$NetworkChannel;

    public NetworkChannel() {
        this.maximumRetries = 0;
        this.reconnectSleepTime = 2000L;
        this.remote = false;
        this.started = new SynchronizedBoolean(false);
        this.connected = new SynchronizedBoolean(false);
        this.stopped = new SynchronizedBoolean(false);
        this.demandBasedForwarding = true;
        this.topicConsumerMap = new ConcurrentHashMap();
        this.queueConsumerMap = new ConcurrentHashMap();
    }

    public NetworkChannel(PooledExecutor pooledExecutor) {
        this();
        this.threadPool = pooledExecutor;
    }

    public NetworkChannel(NetworkConnector networkConnector, BrokerContainer brokerContainer, String str) {
        this(networkConnector.threadPool);
        this.brokerContainer = brokerContainer;
        this.uri = str;
    }

    public NetworkChannel(NetworkConnector networkConnector, BrokerContainer brokerContainer, TransportChannel transportChannel, String str, String str2) throws JMSException {
        this(networkConnector.threadPool);
        this.brokerContainer = brokerContainer;
        this.uri = "";
        this.remoteBrokerName = str;
        this.remoteClusterName = str2;
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setJ2EEcompliant(false);
        activeMQConnectionFactory.setTurboBoost(true);
        this.remoteConnection = new ActiveMQConnection(activeMQConnectionFactory, this.remoteUserName, this.remotePassword, transportChannel);
        this.remoteConnection.setClientID(new StringBuffer().append("Boondocks:").append(this.remoteClusterName).append(":").append(str).toString());
        this.remoteConnection.setQuickClose(true);
        this.remoteConnection.start();
        BrokerInfo brokerInfo = new BrokerInfo();
        brokerInfo.setBrokerName(brokerContainer.getBroker().getBrokerName());
        brokerInfo.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
        transportChannel.asyncSend(brokerInfo);
        this.remote = true;
    }

    @Override // org.activemq.transport.TransportStatusEventListener
    public void statusChanged(TransportStatusEvent transportStatusEvent) {
        if (transportStatusEvent == null || !(transportStatusEvent.getChannelStatus() == 1 || transportStatusEvent.getChannelStatus() == 3)) {
            this.connected.set(false);
        } else {
            this.connected.set(true);
        }
    }

    private void doSetConnected() {
        synchronized (this.connected) {
            this.connected.set(true);
            this.connected.notifyAll();
        }
    }

    public String toString() {
        return new StringBuffer().append("NetworkChannel{ , uri = '").append(this.uri).append("' ").append(", remoteBrokerName = '").append(this.remoteBrokerName).append("' ").append(" }").toString();
    }

    @Override // org.activemq.service.Service
    public void start() {
        if (this.started.commit(false, true)) {
            try {
                this.stopped.set(false);
                this.threadPool.execute(new Runnable(this) { // from class: org.activemq.transport.NetworkChannel.1
                    private final NetworkChannel this$0;

                    {
                        this.this$0 = this;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        String name = Thread.currentThread().getName();
                        try {
                            try {
                                Thread.currentThread().setName(new StringBuffer().append("NetworkChannel Initiator to ").append(this.this$0.uri).toString());
                                this.this$0.initialize();
                                this.this$0.startSubscriptions();
                                NetworkChannel.log.info(new StringBuffer().append("Started NetworkChannel to ").append(this.this$0.uri).toString());
                                Thread.currentThread().setName(name);
                            } catch (JMSException e) {
                                NetworkChannel.log.error(new StringBuffer().append("Failed to start NetworkChannel: ").append(this.this$0.uri).toString(), e);
                                Thread.currentThread().setName(name);
                            }
                        } catch (Throwable th) {
                            Thread.currentThread().setName(name);
                            throw th;
                        }
                    }
                });
            } catch (InterruptedException e) {
                log.warn("Failed to start - interuppted", e);
            }
        }
    }

    @Override // org.activemq.service.Service
    public void stop() throws JMSException {
        if (this.started.commit(true, false)) {
            this.stopped.set(true);
            this.topicConsumerMap.clear();
            if (this.remoteConnection != null) {
                this.remoteConnection.close();
                this.remoteConnection = null;
            }
            if (this.localConnection != null) {
                this.localConnection.close();
                this.localConnection = null;
            }
            Iterator it = this.topicConsumerMap.values().iterator();
            while (it.hasNext()) {
                ((NetworkMessageBridge) it.next()).stop();
            }
        }
    }

    @Override // org.activemq.broker.ConsumerInfoListener
    public void onConsumerInfo(BrokerClient brokerClient, ConsumerInfo consumerInfo) {
        brokerClient.getBrokerConnector().getBrokerInfo().getBrokerName();
        if (brokerClient.isClusteredConnection()) {
            return;
        }
        if (!this.connected.get()) {
            try {
                this.threadPool.execute(new Runnable(this, brokerClient, consumerInfo) { // from class: org.activemq.transport.NetworkChannel.2
                    private final BrokerClient val$client;
                    private final ConsumerInfo val$info;
                    private final NetworkChannel this$0;

                    {
                        this.this$0 = this;
                        this.val$client = brokerClient;
                        this.val$info = consumerInfo;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        if (this.val$client.isClusteredConnection() || this.val$info.hasVisited(this.this$0.remoteBrokerName)) {
                            return;
                        }
                        synchronized (this.this$0.connected) {
                            while (!this.this$0.connected.get() && !this.this$0.stopped.get()) {
                                try {
                                    this.this$0.connected.wait(500L);
                                } catch (InterruptedException e) {
                                    NetworkChannel.log.debug("interuppted", e);
                                }
                            }
                            if (this.val$info.isStarted()) {
                                this.this$0.addConsumerInfo(this.val$info);
                            } else {
                                this.this$0.removeConsumerInfo(this.val$info);
                            }
                        }
                    }
                });
                return;
            } catch (InterruptedException e) {
                log.warn(new StringBuffer().append("Failed to process ConsumerInfo: ").append(consumerInfo).toString(), e);
                return;
            }
        }
        if (consumerInfo.hasVisited(this.remoteBrokerName)) {
            return;
        }
        if (consumerInfo.isStarted()) {
            addConsumerInfo(consumerInfo);
        } else {
            removeConsumerInfo(consumerInfo);
        }
    }

    public String getUri() {
        return this.uri;
    }

    public void setUri(String str) {
        this.uri = str;
    }

    public String getRemotePassword() {
        return this.remotePassword;
    }

    public void setRemotePassword(String str) {
        this.remotePassword = str;
    }

    public String getRemoteUserName() {
        return this.remoteUserName;
    }

    public void setRemoteUserName(String str) {
        this.remoteUserName = str;
    }

    public BrokerContainer getBrokerContainer() {
        return this.brokerContainer;
    }

    public void setBrokerContainer(BrokerContainer brokerContainer) {
        this.brokerContainer = brokerContainer;
    }

    public int getMaximumRetries() {
        return this.maximumRetries;
    }

    public void setMaximumRetries(int i) {
        this.maximumRetries = i;
    }

    public long getReconnectSleepTime() {
        return this.reconnectSleepTime;
    }

    public void setReconnectSleepTime(long j) {
        this.reconnectSleepTime = j;
    }

    public String getRemoteBrokerName() {
        return this.remoteBrokerName;
    }

    public void setRemoteBrokerName(String str) {
        this.remoteBrokerName = str;
    }

    protected PooledExecutor getThreadPool() {
        return this.threadPool;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setThreadPool(PooledExecutor pooledExecutor) {
        this.threadPool = pooledExecutor;
    }

    private synchronized ActiveMQConnection getLocalConnection() throws JMSException {
        if (this.localConnection == null) {
            initializeLocal();
        }
        return this.localConnection;
    }

    private synchronized ActiveMQConnection getRemoteConnection() throws JMSException {
        if (this.remoteConnection == null) {
            initializeRemote();
        }
        return this.remoteConnection;
    }

    public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() {
        return this.localPrefetchPolicy;
    }

    public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy activeMQPrefetchPolicy) {
        this.localPrefetchPolicy = activeMQPrefetchPolicy;
    }

    public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() {
        return this.remotePrefetchPolicy;
    }

    public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy activeMQPrefetchPolicy) {
        this.remotePrefetchPolicy = activeMQPrefetchPolicy;
    }

    public boolean isDemandBasedForwarding() {
        return this.demandBasedForwarding;
    }

    public void setDemandBasedForwarding(boolean z) {
        this.demandBasedForwarding = z;
    }

    @Override // org.activemq.advisories.ConnectionAdvisoryEventListener
    public void onEvent(ConnectionAdvisoryEvent connectionAdvisoryEvent) {
        this.brokerContainer.getBroker().getBrokerName();
        if (connectionAdvisoryEvent.getInfo().isClosed()) {
            this.brokerContainer.deregisterRemoteClientID(connectionAdvisoryEvent.getInfo().getClientId());
        } else {
            this.brokerContainer.registerRemoteClientID(connectionAdvisoryEvent.getInfo().getClientId());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addConsumerInfo(ConsumerInfo consumerInfo) {
        addConsumerInfo(consumerInfo.getDestination(), consumerInfo.getDestination().isTopic(), consumerInfo.isDurableTopic());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addConsumerInfo(ActiveMQDestination activeMQDestination, boolean z, boolean z2) {
        ConcurrentHashMap concurrentHashMap = z ? this.topicConsumerMap : this.queueConsumerMap;
        NetworkMessageBridge networkMessageBridge = (NetworkMessageBridge) concurrentHashMap.get(activeMQDestination.getPhysicalName());
        if (networkMessageBridge == null) {
            networkMessageBridge = createBridge(concurrentHashMap, activeMQDestination, z2);
        } else if (z2 && !networkMessageBridge.isDurableTopic() && !this.demandBasedForwarding) {
            networkMessageBridge.decrementReferenceCount();
            upgradeBridge(networkMessageBridge);
        }
        networkMessageBridge.incrementReferenceCount();
    }

    private void upgradeBridge(NetworkMessageBridge networkMessageBridge) {
        try {
            this.remoteConnection.stop();
            networkMessageBridge.upgrade();
        } catch (JMSException e) {
            log.warn(new StringBuffer().append("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: ").append(networkMessageBridge.getDestination()).toString(), e);
        }
        try {
            this.remoteConnection.start();
        } catch (JMSException e2) {
            log.error("Failed to restart the NetworkMessageBridge", e2);
        }
    }

    private NetworkMessageBridge createBridge(Map map, ActiveMQDestination activeMQDestination, boolean z) {
        NetworkMessageBridge networkMessageBridge = new NetworkMessageBridge();
        try {
            networkMessageBridge.setDestination(activeMQDestination);
            networkMessageBridge.setDurableTopic(z);
            networkMessageBridge.setLocalBrokerName(this.brokerContainer.getBroker().getBrokerName());
            networkMessageBridge.setLocalSession(getLocalConnection().createSession(false, 2));
            networkMessageBridge.setRemoteSession(getRemoteConnection().createSession(false, 2));
            map.put(activeMQDestination.getPhysicalName(), networkMessageBridge);
            networkMessageBridge.start();
            log.info(new StringBuffer().append("started NetworkMessageBridge for destination: ").append(activeMQDestination).append(" -- NetworkChannel: ").append(toString()).toString());
        } catch (JMSException e) {
            log.error(new StringBuffer().append("Failed to start NetworkMessageBridge for destination: ").append(activeMQDestination).toString(), e);
        }
        return networkMessageBridge;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void removeConsumerInfo(ConsumerInfo consumerInfo) {
        String physicalName = consumerInfo.getDestination().getPhysicalName();
        NetworkMessageBridge networkMessageBridge = (NetworkMessageBridge) ((this.demandBasedForwarding || consumerInfo.getDestination().isTopic()) ? this.topicConsumerMap : this.queueConsumerMap).get(physicalName);
        if (networkMessageBridge == null || networkMessageBridge.decrementReferenceCount() > 0) {
            return;
        }
        try {
            this.threadPool.execute(new Runnable(this, networkMessageBridge, physicalName, consumerInfo) { // from class: org.activemq.transport.NetworkChannel.3
                private final NetworkMessageBridge val$bridge;
                private final String val$physicalName;
                private final ConsumerInfo val$info;
                private final NetworkChannel this$0;

                {
                    this.this$0 = this;
                    this.val$bridge = networkMessageBridge;
                    this.val$physicalName = physicalName;
                    this.val$info = consumerInfo;
                }

                @Override // java.lang.Runnable
                public void run() {
                    this.val$bridge.stop();
                    this.this$0.topicConsumerMap.remove(this.val$physicalName);
                    NetworkChannel.log.info(new StringBuffer().append("stopped MetworkMessageBridge for destination: ").append(this.val$info.getDestination()).toString());
                }
            });
        } catch (InterruptedException e) {
            log.warn("got interrupted stoping NetworkBridge", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startSubscriptions() {
        if (this.demandBasedForwarding || this.remote) {
            return;
        }
        MessageContainerManager persistentTopicContainerManager = this.brokerContainer.getBroker().getPersistentTopicContainerManager();
        if (persistentTopicContainerManager != null) {
            startSubscriptions(persistentTopicContainerManager.getLocalDestinations(), true, true);
        }
        MessageContainerManager transientTopicContainerManager = this.brokerContainer.getBroker().getTransientTopicContainerManager();
        if (transientTopicContainerManager != null) {
            startSubscriptions(transientTopicContainerManager.getLocalDestinations(), true, false);
        }
        MessageContainerManager transientQueueContainerManager = this.brokerContainer.getBroker().getTransientQueueContainerManager();
        if (transientQueueContainerManager != null) {
            startSubscriptions(transientQueueContainerManager.getLocalDestinations(), false, false);
        }
        MessageContainerManager persistentQueueContainerManager = this.brokerContainer.getBroker().getPersistentQueueContainerManager();
        if (persistentQueueContainerManager != null) {
            startSubscriptions(persistentQueueContainerManager.getLocalDestinations(), false, false);
        }
    }

    private void startSubscriptions(Map map, boolean z, boolean z2) {
        if (map != null) {
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                addConsumerInfo((ActiveMQDestination) it.next(), z, z2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initialize() throws JMSException {
        initializeLocal();
        initializeRemote();
        this.brokerContainer.getBroker().addConsumerInfoListener(this);
    }

    private synchronized void initializeRemote() throws JMSException {
        if (this.remoteConnection == null) {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.remoteUserName, this.remotePassword, this.uri);
            activeMQConnectionFactory.setJ2EEcompliant(false);
            activeMQConnectionFactory.setQuickClose(true);
            activeMQConnectionFactory.setInternalConnection(true);
            this.remoteConnection = (ActiveMQConnection) activeMQConnectionFactory.createConnection();
            TransportChannel transportChannel = this.remoteConnection.getTransportChannel();
            if (transportChannel instanceof CompositeTransportChannel) {
                CompositeTransportChannel compositeTransportChannel = (CompositeTransportChannel) transportChannel;
                compositeTransportChannel.setMaximumRetries(this.maximumRetries);
                compositeTransportChannel.setFailureSleepTime(this.reconnectSleepTime);
                compositeTransportChannel.setIncrementTimeout(false);
            }
            transportChannel.addTransportStatusEventListener(this);
            this.remoteConnection.setClientID(new StringBuffer().append(this.brokerContainer.getBroker().getBrokerName()).append("_NetworkChannel").toString());
            this.remoteConnection.start();
            BrokerInfo brokerInfo = new BrokerInfo();
            brokerInfo.setBrokerName(this.brokerContainer.getBroker().getBrokerName());
            brokerInfo.setClusterName(this.brokerContainer.getBroker().getBrokerClusterName());
            Receipt syncSendRequest = this.remoteConnection.syncSendRequest(brokerInfo);
            if (syncSendRequest != null) {
                this.remoteBrokerName = syncSendRequest.getBrokerName();
                this.remoteClusterName = syncSendRequest.getClusterName();
            }
            this.connectionAdvisor = new ConnectionAdvisor(this.remoteConnection);
            this.connectionAdvisor.addListener(this);
            this.connectionAdvisor.start();
            if (this.remotePrefetchPolicy != null) {
                this.remoteConnection.setPrefetchPolicy(this.remotePrefetchPolicy);
            }
        }
        doSetConnected();
    }

    private void initializeLocal() throws JMSException {
        String brokerName = this.brokerContainer.getBroker().getBrokerName();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(new StringBuffer().append("vm://").append(brokerName).toString());
        activeMQConnectionFactory.setTurboBoost(true);
        activeMQConnectionFactory.setJ2EEcompliant(false);
        activeMQConnectionFactory.setBrokerName(brokerName);
        activeMQConnectionFactory.setQuickClose(true);
        activeMQConnectionFactory.setInternalConnection(true);
        this.localConnection = (ActiveMQConnection) activeMQConnectionFactory.createConnection();
        this.localConnection.start();
        BrokerInfo brokerInfo = new BrokerInfo();
        brokerInfo.setBrokerName(this.remoteBrokerName);
        brokerInfo.setClusterName(this.remoteClusterName);
        this.localConnection.asyncSendPacket(brokerInfo);
        if (this.localPrefetchPolicy != null) {
            this.localConnection.setPrefetchPolicy(this.localPrefetchPolicy);
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$activemq$transport$NetworkChannel == null) {
            cls = class$("org.activemq.transport.NetworkChannel");
            class$org$activemq$transport$NetworkChannel = cls;
        } else {
            cls = class$org$activemq$transport$NetworkChannel;
        }
        log = LogFactory.getLog(cls);
    }
}
