package org.activemq.broker;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.activemq.Service;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.BrokerInfo;
import org.activemq.command.Command;
import org.activemq.command.ConnectionId;
import org.activemq.command.ConnectionInfo;
import org.activemq.command.ConsumerId;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.DataArrayResponse;
import org.activemq.command.DestinationInfo;
import org.activemq.command.ExceptionResponse;
import org.activemq.command.KeepAliveInfo;
import org.activemq.command.Message;
import org.activemq.command.MessageAck;
import org.activemq.command.MessageDispatch;
import org.activemq.command.ProducerId;
import org.activemq.command.ProducerInfo;
import org.activemq.command.RemoveSubscriptionInfo;
import org.activemq.command.Response;
import org.activemq.command.SessionId;
import org.activemq.command.SessionInfo;
import org.activemq.command.ShutdownInfo;
import org.activemq.command.TransactionInfo;
import org.activemq.command.WireFormatInfo;
import org.activemq.state.CommandVisitor;
import org.activemq.state.ConsumerState;
import org.activemq.state.ProducerState;
import org.activemq.state.SessionState;
import org.activemq.thread.Task;
import org.activemq.thread.TaskRunner;
import org.activemq.thread.TaskRunnerFactory;
import org.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:activemq-core-4.0-M1.jar:org/activemq/broker/AbstractConnection.class */
public abstract class AbstractConnection implements Service, Connection, Task, CommandVisitor {
    private static final Log log;
    protected final Broker broker;
    protected final TaskRunner taskRunner;
    protected final Connector connector;
    private WireFormatInfo wireFormatInfo;
    static Class class$org$activemq$broker$AbstractConnection;
    protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
    protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
    protected boolean disposed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:activemq-core-4.0-M1.jar:org/activemq/broker/AbstractConnection$ConnectionState.class */
    public static class ConnectionState extends org.activemq.state.ConnectionState {
        private final ConnectionContext context;

        public ConnectionState(ConnectionInfo connectionInfo, ConnectionContext connectionContext) {
            super(connectionInfo);
            this.context = connectionContext;
        }

        public ConnectionContext getContext() {
            return this.context;
        }
    }

    public AbstractConnection(Connector connector, Broker broker, TaskRunnerFactory taskRunnerFactory) {
        this.connector = connector;
        this.broker = broker;
        if (taskRunnerFactory != null) {
            this.taskRunner = taskRunnerFactory.createTaskRunner(this);
        } else {
            this.taskRunner = null;
        }
    }

    @Override // org.activemq.Service
    public void start() throws Exception {
        dispatch(this.connector.getBrokerInfo());
    }

    @Override // org.activemq.Service
    public void stop() throws Exception {
        if (this.disposed) {
            return;
        }
        this.disposed = true;
        Iterator it = new ArrayList(this.connectionStates.keySet()).iterator();
        while (it.hasNext()) {
            try {
                processRemoveConnection((ConnectionId) it.next());
            } catch (Throwable th) {
            }
        }
    }

    public void serviceTransportException(IOException iOException) {
        if (this.disposed) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Transport failed: ").append(iOException).toString(), iOException);
        }
        log.debug(new StringBuffer().append("Transport failed: ").append(iOException).toString(), iOException);
        ServiceSupport.dispose(this);
    }

    @Override // org.activemq.broker.Connection
    public void serviceException(Throwable th) {
        if (this.disposed) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Async error occurred: ").append(th).toString(), th);
        }
        th.printStackTrace();
    }

    @Override // org.activemq.broker.Connection
    public Response service(Command command) {
        Response response = null;
        boolean isResponseRequired = command.isResponseRequired();
        short commandId = command.getCommandId();
        try {
            response = command.visit(this);
        } catch (Throwable th) {
            if (isResponseRequired) {
                if (log.isDebugEnabled()) {
                    log.debug(new StringBuffer().append("Sync error occurred: ").append(th).toString(), th);
                }
                response = new ExceptionResponse(th);
            } else {
                serviceException(th);
            }
        }
        if (isResponseRequired) {
            if (response == null) {
                response = new Response();
            }
            response.setCorrelationId(commandId);
        }
        return response;
    }

    protected ConnectionState lookupConnectionState(ConsumerId consumerId) {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(consumerId.getParentId().getParentId());
        if (connectionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot lookup a consumer from a connection that had not been registered: ").append(consumerId.getParentId().getParentId()).toString());
        }
        return connectionState;
    }

    protected ConnectionState lookupConnectionState(ProducerId producerId) {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(producerId.getParentId().getParentId());
        if (connectionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot lookup a producer from a connection that had not been registered: ").append(producerId.getParentId().getParentId()).toString());
        }
        return connectionState;
    }

    protected ConnectionState lookupConnectionState(SessionId sessionId) {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(sessionId.getParentId());
        if (connectionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot lookup a session from a connection that had not been registered: ").append(sessionId.getParentId()).toString());
        }
        return connectionState;
    }

    protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(connectionId);
        if (connectionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot lookup a connection that had not been registered: ").append(connectionId).toString());
        }
        return connectionState;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processKeepAlive(KeepAliveInfo keepAliveInfo) throws Throwable {
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRemoveSubscription(RemoveSubscriptionInfo removeSubscriptionInfo) throws Throwable {
        this.broker.removeSubscription(lookupConnectionState(removeSubscriptionInfo.getConnectionId()).getContext(), removeSubscriptionInfo);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processWireFormat(WireFormatInfo wireFormatInfo) throws Throwable {
        this.wireFormatInfo = wireFormatInfo;
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processShutdown(ShutdownInfo shutdownInfo) throws Throwable {
        stop();
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processBeginTransaction(TransactionInfo transactionInfo) throws Throwable {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(transactionInfo.getConnectionId());
        ConnectionContext connectionContext = null;
        if (connectionState != null) {
            connectionContext = connectionState.getContext();
        }
        this.broker.beginTransaction(connectionContext, transactionInfo.getTransactionId());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processEndTransaction(TransactionInfo transactionInfo) throws Throwable {
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processPrepareTransaction(TransactionInfo transactionInfo) throws Throwable {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(transactionInfo.getConnectionId());
        ConnectionContext connectionContext = null;
        if (connectionState != null) {
            connectionContext = connectionState.getContext();
        }
        this.broker.prepareTransaction(connectionContext, transactionInfo.getTransactionId());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processCommitTransactionOnePhase(TransactionInfo transactionInfo) throws Throwable {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(transactionInfo.getConnectionId());
        ConnectionContext connectionContext = null;
        if (connectionState != null) {
            connectionContext = connectionState.getContext();
        }
        this.broker.commitTransaction(connectionContext, transactionInfo.getTransactionId(), true);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processCommitTransactionTwoPhase(TransactionInfo transactionInfo) throws Throwable {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(transactionInfo.getConnectionId());
        ConnectionContext connectionContext = null;
        if (connectionState != null) {
            connectionContext = connectionState.getContext();
        }
        this.broker.commitTransaction(connectionContext, transactionInfo.getTransactionId(), false);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRollbackTransaction(TransactionInfo transactionInfo) throws Throwable {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(transactionInfo.getConnectionId());
        ConnectionContext connectionContext = null;
        if (connectionState != null) {
            connectionContext = connectionState.getContext();
        }
        this.broker.rollbackTransaction(connectionContext, transactionInfo.getTransactionId());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processForgetTransaction(TransactionInfo transactionInfo) throws Throwable {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(transactionInfo.getConnectionId());
        ConnectionContext connectionContext = null;
        if (connectionState != null) {
            connectionContext = connectionState.getContext();
        }
        this.broker.forgetTransaction(connectionContext, transactionInfo.getTransactionId());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRecoverTransactions(TransactionInfo transactionInfo) throws Throwable {
        ConnectionState connectionState = (ConnectionState) this.connectionStates.get(transactionInfo.getConnectionId());
        ConnectionContext connectionContext = null;
        if (connectionState != null) {
            connectionContext = connectionState.getContext();
        }
        return new DataArrayResponse(this.broker.getPreparedTransactions(connectionContext));
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processMessage(Message message) throws Throwable {
        this.broker.send(lookupConnectionState(message.getProducerId()).getContext(), message);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processMessageAck(MessageAck messageAck) throws Throwable {
        this.broker.acknowledge(lookupConnectionState(messageAck.getConsumerId()).getContext(), messageAck);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processBrokerInfo(BrokerInfo brokerInfo) {
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processAddDestination(DestinationInfo destinationInfo) throws Throwable {
        ConnectionState lookupConnectionState = lookupConnectionState(destinationInfo.getConnectionId());
        this.broker.addDestination(lookupConnectionState.getContext(), destinationInfo.getDestination());
        if (!destinationInfo.getDestination().isTemporary()) {
            return null;
        }
        lookupConnectionState.addTempDestination(destinationInfo.getDestination());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRemoveDestination(DestinationInfo destinationInfo) throws Throwable {
        ConnectionState lookupConnectionState = lookupConnectionState(destinationInfo.getConnectionId());
        this.broker.removeDestination(lookupConnectionState.getContext(), destinationInfo.getDestination(), destinationInfo.getTimeout());
        if (!destinationInfo.getDestination().isTemporary()) {
            return null;
        }
        lookupConnectionState.removeTempDestination(destinationInfo.getDestination());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processAddProducer(ProducerInfo producerInfo) throws Throwable {
        SessionId parentId = producerInfo.getProducerId().getParentId();
        ConnectionState lookupConnectionState = lookupConnectionState(parentId.getParentId());
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot add a producer to a session that had not been registered: ").append(parentId).toString());
        }
        this.broker.addProducer(lookupConnectionState.getContext(), producerInfo);
        sessionState.addProducer(producerInfo);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRemoveProducer(ProducerId producerId) throws Throwable {
        SessionId parentId = producerId.getParentId();
        ConnectionState lookupConnectionState = lookupConnectionState(parentId.getParentId());
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot remove a producer from a session that had not been registered: ").append(parentId).toString());
        }
        ProducerState removeProducer = sessionState.removeProducer(producerId);
        if (removeProducer == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot remove a producer that had not been registered: ").append(producerId).toString());
        }
        this.broker.removeProducer(lookupConnectionState.getContext(), removeProducer.getInfo());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processAddConsumer(ConsumerInfo consumerInfo) throws Throwable {
        SessionId parentId = consumerInfo.getConsumerId().getParentId();
        ConnectionState lookupConnectionState = lookupConnectionState(parentId.getParentId());
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot add a consumer to a session that had not been registered: ").append(parentId).toString());
        }
        this.broker.addConsumer(lookupConnectionState.getContext(), consumerInfo);
        sessionState.addConsumer(consumerInfo);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRemoveConsumer(ConsumerId consumerId) throws Throwable {
        SessionId parentId = consumerId.getParentId();
        ConnectionState lookupConnectionState = lookupConnectionState(parentId.getParentId());
        SessionState sessionState = lookupConnectionState.getSessionState(parentId);
        if (sessionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot remove a consumer from a session that had not been registered: ").append(parentId).toString());
        }
        ConsumerState removeConsumer = sessionState.removeConsumer(consumerId);
        if (removeConsumer == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot remove a consumer that had not been registered: ").append(consumerId).toString());
        }
        this.broker.removeConsumer(lookupConnectionState.getContext(), removeConsumer.getInfo());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processAddSession(SessionInfo sessionInfo) throws Throwable {
        ConnectionState lookupConnectionState = lookupConnectionState(sessionInfo.getSessionId().getParentId());
        this.broker.addSession(lookupConnectionState.getContext(), sessionInfo);
        lookupConnectionState.addSession(sessionInfo);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRemoveSession(SessionId sessionId) throws Throwable {
        ConnectionState lookupConnectionState = lookupConnectionState(sessionId.getParentId());
        SessionState sessionState = lookupConnectionState.getSessionState(sessionId);
        if (sessionState == null) {
            throw new IllegalStateException(new StringBuffer().append("Cannot remove session that had not been registered: ").append(sessionId).toString());
        }
        Iterator it = sessionState.getConsumerIds().iterator();
        while (it.hasNext()) {
            processRemoveConsumer((ConsumerId) it.next());
        }
        Iterator it2 = sessionState.getProducerIds().iterator();
        while (it2.hasNext()) {
            processRemoveProducer((ProducerId) it2.next());
        }
        lookupConnectionState.removeSession(sessionId);
        this.broker.removeSession(lookupConnectionState.getContext(), sessionState.getInfo());
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processAddConnection(ConnectionInfo connectionInfo) throws Throwable {
        ConnectionContext connectionContext = new ConnectionContext();
        connectionContext.setConnection(this);
        connectionContext.setBroker(this.broker);
        connectionContext.setConnector(this.connector);
        connectionContext.setTransactions(new ConcurrentHashMap());
        connectionContext.setClientId(connectionInfo.getClientId());
        connectionContext.setUserName(connectionInfo.getUserName());
        connectionContext.setConnectionId(connectionInfo.getConnectionId());
        connectionContext.setWireFormatInfo(this.wireFormatInfo);
        this.connectionStates.put(connectionInfo.getConnectionId(), new ConnectionState(connectionInfo, connectionContext));
        this.broker.addConnection(connectionContext, connectionInfo);
        return null;
    }

    @Override // org.activemq.state.CommandVisitor
    public Response processRemoveConnection(ConnectionId connectionId) throws Throwable {
        ConnectionState lookupConnectionState = lookupConnectionState(connectionId);
        Iterator it = lookupConnectionState.getSessionIds().iterator();
        while (it.hasNext()) {
            processRemoveSession((SessionId) it.next());
        }
        Iterator it2 = lookupConnectionState.getTempDesinations().iterator();
        while (it2.hasNext()) {
            this.broker.removeDestination(lookupConnectionState.getContext(), (ActiveMQDestination) it2.next(), 0L);
            it2.remove();
        }
        this.broker.removeConnection(lookupConnectionState.getContext(), lookupConnectionState.getInfo(), null);
        this.connectionStates.remove(connectionId);
        return null;
    }

    @Override // org.activemq.broker.Connection
    public Connector getConnector() {
        return this.connector;
    }

    @Override // org.activemq.broker.Connection
    public void dispatchSync(Command command) {
        if (!command.isMessageDispatch()) {
            dispatch(command);
            return;
        }
        Runnable runnable = (Runnable) ((MessageDispatch) command).getConsumer();
        try {
            dispatch(command);
            if (runnable != null) {
                runnable.run();
            }
        } catch (Throwable th) {
            if (runnable != null) {
                runnable.run();
            }
            throw th;
        }
    }

    @Override // org.activemq.broker.Connection
    public void dispatchAsync(Command command) {
        if (this.taskRunner == null) {
            dispatchSync(command);
            return;
        }
        this.dispatchQueue.add(command);
        try {
            this.taskRunner.wakeup();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.activemq.thread.Task
    public boolean iterate() {
        if (this.dispatchQueue.isEmpty()) {
            return false;
        }
        dispatch((Command) this.dispatchQueue.remove(0));
        return true;
    }

    @Override // org.activemq.broker.Connection
    public boolean isSlow() {
        return false;
    }

    @Override // org.activemq.broker.Connection
    public boolean isBlocked() {
        return false;
    }

    @Override // org.activemq.broker.Connection
    public boolean isConnected() {
        return !this.disposed;
    }

    @Override // org.activemq.broker.Connection
    public boolean isActive() {
        return !this.disposed;
    }

    protected abstract void dispatch(Command command);

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$activemq$broker$AbstractConnection == null) {
            cls = class$("org.activemq.broker.AbstractConnection");
            class$org$activemq$broker$AbstractConnection = cls;
        } else {
            cls = class$org$activemq$broker$AbstractConnection;
        }
        log = LogFactory.getLog(cls);
    }
}
