/*
 * Decompiled with CFR 0.152.
 */
package net.timewalker.ffmq4.listeners;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import net.timewalker.ffmq4.FFMQException;
import net.timewalker.ffmq4.common.message.AbstractMessage;
import net.timewalker.ffmq4.listeners.AbstractClientListener;
import net.timewalker.ffmq4.listeners.ClientProcessorMBean;
import net.timewalker.ffmq4.listeners.utils.RemoteNotificationProxy;
import net.timewalker.ffmq4.local.FFMQEngine;
import net.timewalker.ffmq4.local.connection.LocalConnection;
import net.timewalker.ffmq4.local.destination.notification.NotificationProxy;
import net.timewalker.ffmq4.local.session.LocalDurableTopicSubscriber;
import net.timewalker.ffmq4.local.session.LocalMessageConsumer;
import net.timewalker.ffmq4.local.session.LocalQueueBrowser;
import net.timewalker.ffmq4.local.session.LocalQueueBrowserEnumeration;
import net.timewalker.ffmq4.local.session.LocalSession;
import net.timewalker.ffmq4.transport.PacketTransport;
import net.timewalker.ffmq4.transport.PacketTransportException;
import net.timewalker.ffmq4.transport.PacketTransportListener;
import net.timewalker.ffmq4.transport.packet.AbstractPacket;
import net.timewalker.ffmq4.transport.packet.AbstractQueryPacket;
import net.timewalker.ffmq4.transport.packet.AbstractResponsePacket;
import net.timewalker.ffmq4.transport.packet.query.AbstractConsumerQuery;
import net.timewalker.ffmq4.transport.packet.query.AbstractQueueBrowserEnumerationQuery;
import net.timewalker.ffmq4.transport.packet.query.AbstractQueueBrowserQuery;
import net.timewalker.ffmq4.transport.packet.query.AbstractSessionQuery;
import net.timewalker.ffmq4.transport.packet.query.AcknowledgeQuery;
import net.timewalker.ffmq4.transport.packet.query.CloseBrowserEnumerationQuery;
import net.timewalker.ffmq4.transport.packet.query.CloseBrowserQuery;
import net.timewalker.ffmq4.transport.packet.query.CloseConsumerQuery;
import net.timewalker.ffmq4.transport.packet.query.CloseSessionQuery;
import net.timewalker.ffmq4.transport.packet.query.CommitQuery;
import net.timewalker.ffmq4.transport.packet.query.CreateBrowserQuery;
import net.timewalker.ffmq4.transport.packet.query.CreateConsumerQuery;
import net.timewalker.ffmq4.transport.packet.query.CreateDurableSubscriberQuery;
import net.timewalker.ffmq4.transport.packet.query.CreateSessionQuery;
import net.timewalker.ffmq4.transport.packet.query.CreateTemporaryQueueQuery;
import net.timewalker.ffmq4.transport.packet.query.CreateTemporaryTopicQuery;
import net.timewalker.ffmq4.transport.packet.query.DeleteTemporaryQueueQuery;
import net.timewalker.ffmq4.transport.packet.query.DeleteTemporaryTopicQuery;
import net.timewalker.ffmq4.transport.packet.query.GetQuery;
import net.timewalker.ffmq4.transport.packet.query.OpenConnectionQuery;
import net.timewalker.ffmq4.transport.packet.query.PrefetchQuery;
import net.timewalker.ffmq4.transport.packet.query.PutQuery;
import net.timewalker.ffmq4.transport.packet.query.QueueBrowserFetchElementQuery;
import net.timewalker.ffmq4.transport.packet.query.QueueBrowserGetEnumerationQuery;
import net.timewalker.ffmq4.transport.packet.query.RecoverQuery;
import net.timewalker.ffmq4.transport.packet.query.RollbackMessageQuery;
import net.timewalker.ffmq4.transport.packet.query.RollbackQuery;
import net.timewalker.ffmq4.transport.packet.query.SetClientIDQuery;
import net.timewalker.ffmq4.transport.packet.query.UnsubscribeQuery;
import net.timewalker.ffmq4.transport.packet.response.AcknowledgeResponse;
import net.timewalker.ffmq4.transport.packet.response.CloseBrowserEnumerationResponse;
import net.timewalker.ffmq4.transport.packet.response.CloseBrowserResponse;
import net.timewalker.ffmq4.transport.packet.response.CloseConsumerResponse;
import net.timewalker.ffmq4.transport.packet.response.CloseSessionResponse;
import net.timewalker.ffmq4.transport.packet.response.CommitResponse;
import net.timewalker.ffmq4.transport.packet.response.CreateBrowserResponse;
import net.timewalker.ffmq4.transport.packet.response.CreateConsumerResponse;
import net.timewalker.ffmq4.transport.packet.response.CreateSessionResponse;
import net.timewalker.ffmq4.transport.packet.response.CreateTemporaryQueueResponse;
import net.timewalker.ffmq4.transport.packet.response.CreateTemporaryTopicResponse;
import net.timewalker.ffmq4.transport.packet.response.DeleteTemporaryQueueResponse;
import net.timewalker.ffmq4.transport.packet.response.DeleteTemporaryTopicResponse;
import net.timewalker.ffmq4.transport.packet.response.ErrorResponse;
import net.timewalker.ffmq4.transport.packet.response.GetResponse;
import net.timewalker.ffmq4.transport.packet.response.OpenConnectionResponse;
import net.timewalker.ffmq4.transport.packet.response.PingResponse;
import net.timewalker.ffmq4.transport.packet.response.PrefetchResponse;
import net.timewalker.ffmq4.transport.packet.response.PutResponse;
import net.timewalker.ffmq4.transport.packet.response.QueueBrowserFetchElementResponse;
import net.timewalker.ffmq4.transport.packet.response.QueueBrowserGetEnumerationResponse;
import net.timewalker.ffmq4.transport.packet.response.RecoverResponse;
import net.timewalker.ffmq4.transport.packet.response.RollbackMessageResponse;
import net.timewalker.ffmq4.transport.packet.response.RollbackResponse;
import net.timewalker.ffmq4.transport.packet.response.SetClientIDResponse;
import net.timewalker.ffmq4.transport.packet.response.StartConnectionResponse;
import net.timewalker.ffmq4.transport.packet.response.StopConnectionResponse;
import net.timewalker.ffmq4.transport.packet.response.UnsubscribeResponse;
import net.timewalker.ffmq4.utils.watchdog.ActiveObject;
import net.timewalker.ffmq4.utils.watchdog.ActivityWatchdog;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public final class ClientProcessor
implements PacketTransportListener,
ActiveObject,
ClientProcessorMBean {
    private static final Log log = LogFactory.getLog(ClientProcessor.class);
    private String id;
    private AbstractClientListener parentListener;
    private FFMQEngine engine;
    protected PacketTransport transport;
    private int authTimeout;
    private LocalConnection localConnection;
    private boolean traceEnabled;
    private long lastActivity;
    private boolean hasCreatedASession;

    public ClientProcessor(String id, AbstractClientListener parentListener, FFMQEngine engine, PacketTransport transport) {
        this.id = id;
        this.parentListener = parentListener;
        this.engine = engine;
        this.transport = transport;
        this.transport.setListener((PacketTransportListener)this);
        this.traceEnabled = log.isTraceEnabled();
        this.authTimeout = engine.getSetup().getSettings().getIntProperty("listener.auth.timeout", 5);
        this.lastActivity = System.currentTimeMillis();
    }

    @Override
    public String getClientID() {
        return this.id;
    }

    @Override
    public String getPeerDescription() {
        return this.transport.getRemotePeerID();
    }

    @Override
    public boolean isAuthenticated() {
        return this.localConnection != null;
    }

    @Override
    public int getSessionsCount() {
        return this.localConnection != null ? this.localConnection.getSessionsCount() : 0;
    }

    @Override
    public int getProducersCount() {
        return this.localConnection != null ? this.localConnection.getProducersCount() : 0;
    }

    @Override
    public int getConsumersCount() {
        return this.localConnection != null ? this.localConnection.getConsumersCount() : 0;
    }

    @Override
    public String getEntitiesDescription() {
        if (this.localConnection == null) {
            return "Not authenticated";
        }
        StringBuilder sb = new StringBuilder(100);
        this.localConnection.getEntitiesDescription(sb);
        return sb.toString();
    }

    public void start() throws PacketTransportException {
        ActivityWatchdog.getInstance().register((ActiveObject)this);
        this.transport.start();
    }

    public void stop() {
        this.transport.close();
        ActivityWatchdog.getInstance().unregister((ActiveObject)this);
    }

    public long getLastActivity() {
        return this.lastActivity;
    }

    @Override
    public Date getConnectionDate() {
        return new Date(this.lastActivity);
    }

    public long getTimeoutDelay() {
        return (long)this.authTimeout * 1000L;
    }

    public boolean onActivityTimeout() throws Exception {
        if (!this.transport.isClosed()) {
            log.warn((Object)("#" + this.id + " Timeout waiting for client activity (" + this.authTimeout + "s), dropping client."));
            this.stop();
        }
        return true;
    }

    public boolean packetReceived(AbstractPacket packet) {
        AbstractQueryPacket query = (AbstractQueryPacket)packet;
        AbstractResponsePacket response = null;
        try {
            try {
                response = this.process(query);
            }
            catch (JMSException e) {
                log.debug((Object)("#" + this.id + " process() failed with " + e.toString()));
                response = new ErrorResponse(e);
            }
        }
        catch (Exception e) {
            log.error((Object)("#" + this.id + " Cannot process command"), (Throwable)e);
        }
        if (response != null && query.isResponseExpected()) {
            response.setEndpointId(query.getEndpointId());
            try {
                if (this.traceEnabled) {
                    log.trace((Object)("#" + this.id + " Sending " + response));
                }
                this.transport.send((AbstractPacket)response);
            }
            catch (Exception e) {
                log.warn((Object)("#" + this.id + " Cannot send response to client : " + e.toString()));
                this.transport.close();
            }
        }
        return this.localConnection != null;
    }

    public void packetSent(AbstractPacket packet) {
        if (this.traceEnabled) {
            log.trace((Object)("#" + this.id + " Sent " + packet));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transportClosed(boolean closedByRemotePeer, boolean mayBlock) {
        this.parentListener.unregisterClient(this);
        try {
            if (this.localConnection != null) {
                this.localConnection.close();
            }
        }
        catch (Exception e) {
            log.error((Object)("#" + this.id + " Could not close local connection"), (Throwable)e);
        }
        finally {
            this.localConnection = null;
        }
    }

    protected AbstractResponsePacket process(AbstractQueryPacket query) throws JMSException {
        switch (query.getType()) {
            case 25: {
                return this.processGet((GetQuery)query);
            }
            case 29: {
                return this.processPut((PutQuery)query);
            }
            case 7: {
                return this.processCommit((CommitQuery)query);
            }
            case 3: {
                return this.processAcknowledge((AcknowledgeQuery)query);
            }
            case 33: {
                return this.processRollback((RollbackQuery)query);
            }
            case 31: {
                return this.processRecover((RecoverQuery)query);
            }
            case 15: {
                return this.processCreateSession((CreateSessionQuery)query);
            }
            case 5: {
                return this.processCloseSession((CloseSessionQuery)query);
            }
            case 9: {
                return this.processCreateConsumer((CreateConsumerQuery)query);
            }
            case 11: {
                return this.processCreateDurableSubscriber((CreateDurableSubscriberQuery)query);
            }
            case 41: {
                return this.processCreateBrowser((CreateBrowserQuery)query);
            }
            case 43: {
                return this.processQueueBrowserGetEnumeration((QueueBrowserGetEnumerationQuery)query);
            }
            case 45: {
                return this.processQueueBrowserFetchElement((QueueBrowserFetchElementQuery)query);
            }
            case 51: {
                return this.processCloseConsumer((CloseConsumerQuery)query);
            }
            case 47: {
                return this.processCloseBrowser((CloseBrowserQuery)query);
            }
            case 49: {
                return this.processCloseBrowserEnumeration((CloseBrowserEnumerationQuery)query);
            }
            case 17: {
                return this.processCreateTemporaryQueue((CreateTemporaryQueueQuery)query);
            }
            case 19: {
                return this.processCreateTemporaryTopic((CreateTemporaryTopicQuery)query);
            }
            case 21: {
                return this.processDeleteTemporaryQueue((DeleteTemporaryQueueQuery)query);
            }
            case 23: {
                return this.processDeleteTemporaryTopic((DeleteTemporaryTopicQuery)query);
            }
            case 27: {
                return this.processOpenConnection((OpenConnectionQuery)query);
            }
            case 37: {
                return this.processStartConnection();
            }
            case 39: {
                return this.processStopConnection();
            }
            case 35: {
                return this.processSetClientID((SetClientIDQuery)query);
            }
            case 55: {
                return this.processUnsubscribe((UnsubscribeQuery)query);
            }
            case 57: {
                return this.processPrefetch((PrefetchQuery)query);
            }
            case 59: {
                return this.processPing();
            }
            case 61: {
                return this.processRollbackMessage((RollbackMessageQuery)query);
            }
        }
        throw new IllegalStateException("Unkown query type id : " + query.getType());
    }

    private LocalConnection getLocalConnection() throws JMSException {
        if (this.localConnection == null) {
            throw new FFMQException("Connection not established", "NETWORK_ERROR");
        }
        return this.localConnection;
    }

    private CreateSessionResponse processCreateSession(CreateSessionQuery query) throws JMSException {
        LocalSession localSession = (LocalSession)this.getLocalConnection().createSession(query.getSessionId(), query.isTransacted(), 2);
        if (!this.hasCreatedASession) {
            this.hasCreatedASession = true;
            ActivityWatchdog.getInstance().unregister((ActiveObject)this);
        }
        localSession.setNotificationProxy((NotificationProxy)new RemoteNotificationProxy(localSession.getId(), this.transport));
        return new CreateSessionResponse();
    }

    private LocalSession lookupSession(AbstractSessionQuery query) throws JMSException {
        LocalSession localSession = (LocalSession)this.getLocalConnection().lookupRegisteredSession(query.getSessionId());
        if (localSession == null) {
            throw new FFMQException("Invalid session id : " + query.getSessionId(), "NETWORK_ERROR");
        }
        return localSession;
    }

    private LocalMessageConsumer lookupConsumer(AbstractConsumerQuery query) throws JMSException {
        LocalSession localSession = this.lookupSession((AbstractSessionQuery)query);
        LocalMessageConsumer consumer = (LocalMessageConsumer)localSession.lookupRegisteredConsumer(query.getConsumerId());
        if (consumer == null) {
            throw new FFMQException("Invalid consumer id : " + query.getConsumerId(), "NETWORK_ERROR");
        }
        return consumer;
    }

    private LocalQueueBrowser lookupBrowser(AbstractQueueBrowserQuery query) throws JMSException {
        LocalSession localSession = this.lookupSession((AbstractSessionQuery)query);
        LocalQueueBrowser browser = (LocalQueueBrowser)localSession.lookupRegisteredBrowser(query.getBrowserId());
        if (browser == null) {
            throw new FFMQException("Invalid browser id : " + query.getBrowserId(), "NETWORK_ERROR");
        }
        return browser;
    }

    private LocalQueueBrowserEnumeration lookupBrowserEnumeration(AbstractQueueBrowserEnumerationQuery query) throws JMSException {
        LocalQueueBrowser browser = this.lookupBrowser((AbstractQueueBrowserQuery)query);
        LocalQueueBrowserEnumeration browserEnum = (LocalQueueBrowserEnumeration)browser.lookupRegisteredEnumeration(query.getEnumId());
        if (browserEnum == null) {
            throw new FFMQException("Invalid browser enumeration id : " + query.getEnumId(), "NETWORK_ERROR");
        }
        return browserEnum;
    }

    private CloseSessionResponse processCloseSession(CloseSessionQuery query) throws JMSException {
        LocalSession localSession = this.lookupSession((AbstractSessionQuery)query);
        localSession.close();
        return new CloseSessionResponse();
    }

    private CommitResponse processCommit(CommitQuery query) throws JMSException {
        LocalSession localSession = this.lookupSession((AbstractSessionQuery)query);
        List deliveredMessageIDs = query.getDeliveredMessageIDs();
        localSession.commit(deliveredMessageIDs != null && !deliveredMessageIDs.isEmpty(), deliveredMessageIDs);
        return new CommitResponse();
    }

    private RollbackResponse processRollback(RollbackQuery query) throws JMSException {
        LocalSession localSession = this.lookupSession((AbstractSessionQuery)query);
        List deliveredMessageIDs = query.getDeliveredMessageIDs();
        localSession.rollback(deliveredMessageIDs != null && !deliveredMessageIDs.isEmpty(), deliveredMessageIDs);
        return new RollbackResponse();
    }

    private GetResponse processGet(GetQuery query) throws JMSException {
        LocalMessageConsumer consumer = this.lookupConsumer((AbstractConsumerQuery)query);
        AbstractMessage msg = consumer.receiveFromDestination(0L, false);
        GetResponse response = new GetResponse();
        response.setMessage(msg);
        return response;
    }

    private AbstractResponsePacket processPrefetch(PrefetchQuery query) throws JMSException {
        LocalMessageConsumer consumer = this.lookupConsumer((AbstractConsumerQuery)query);
        consumer.prefetchMore();
        return new PrefetchResponse();
    }

    private PutResponse processPut(PutQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        AbstractMessage msg = query.getMessage();
        session.dispatch(msg);
        return new PutResponse();
    }

    private AcknowledgeResponse processAcknowledge(AcknowledgeQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        session.acknowledge(query.getDeliveredMessageIDs());
        return new AcknowledgeResponse();
    }

    private RecoverResponse processRecover(RecoverQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        session.recover(query.getDeliveredMessageIDs());
        return new RecoverResponse();
    }

    private CreateBrowserResponse processCreateBrowser(CreateBrowserQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        session.createBrowser(query.getBrowserId(), query.getQueue(), query.getMessageSelector());
        return new CreateBrowserResponse();
    }

    private QueueBrowserGetEnumerationResponse processQueueBrowserGetEnumeration(QueueBrowserGetEnumerationQuery query) throws JMSException {
        LocalQueueBrowser browser = this.lookupBrowser((AbstractQueueBrowserQuery)query);
        LocalQueueBrowserEnumeration browserEnum = (LocalQueueBrowserEnumeration)browser.getEnumeration();
        QueueBrowserGetEnumerationResponse response = new QueueBrowserGetEnumerationResponse();
        response.setEnumId(browserEnum.getId());
        return response;
    }

    private QueueBrowserFetchElementResponse processQueueBrowserFetchElement(QueueBrowserFetchElementQuery query) throws JMSException {
        LocalQueueBrowserEnumeration browserEnum = this.lookupBrowserEnumeration((AbstractQueueBrowserEnumerationQuery)query);
        QueueBrowserFetchElementResponse response = new QueueBrowserFetchElementResponse();
        if (browserEnum.hasMoreElements()) {
            response.setMessage(browserEnum.nextElement());
        } else {
            response.setMessage(null);
        }
        return response;
    }

    private CloseConsumerResponse processCloseConsumer(CloseConsumerQuery query) throws JMSException {
        LocalMessageConsumer consumer = this.lookupConsumer((AbstractConsumerQuery)query);
        consumer.close();
        List undeliveredMessageIDs = query.getUndeliveredMessageIDs();
        if (undeliveredMessageIDs != null && !undeliveredMessageIDs.isEmpty()) {
            ((LocalSession)consumer.getSession()).rollbackUndelivered(undeliveredMessageIDs);
        }
        return new CloseConsumerResponse();
    }

    private RollbackMessageResponse processRollbackMessage(RollbackMessageQuery query) throws JMSException {
        LocalConnection localConnection = this.getLocalConnection();
        LocalSession localSession = (LocalSession)localConnection.lookupRegisteredSession(query.getSessionId());
        if (localSession != null) {
            ArrayList<String> undeliveredMessageIDs = new ArrayList<String>();
            undeliveredMessageIDs.add(query.getMessageId());
            localSession.rollbackUndelivered(undeliveredMessageIDs);
            LocalMessageConsumer consumer = (LocalMessageConsumer)localSession.lookupRegisteredConsumer(query.getConsumerId());
            if (consumer != null) {
                consumer.restorePrefetchCapacity(1);
            }
        }
        return new RollbackMessageResponse();
    }

    private CloseBrowserResponse processCloseBrowser(CloseBrowserQuery query) throws JMSException {
        LocalQueueBrowser browser = this.lookupBrowser((AbstractQueueBrowserQuery)query);
        browser.close();
        return new CloseBrowserResponse();
    }

    private CloseBrowserEnumerationResponse processCloseBrowserEnumeration(CloseBrowserEnumerationQuery query) throws JMSException {
        LocalQueueBrowserEnumeration browserEnum = this.lookupBrowserEnumeration((AbstractQueueBrowserEnumerationQuery)query);
        browserEnum.close();
        return new CloseBrowserEnumerationResponse();
    }

    private CreateConsumerResponse processCreateConsumer(CreateConsumerQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        LocalMessageConsumer consumer = (LocalMessageConsumer)session.createConsumer(query.getConsumerId(), query.getDestination(), query.getMessageSelector(), query.isNoLocal());
        if (query.getDestination() instanceof Queue) {
            consumer.prefetchMore();
        }
        CreateConsumerResponse response = new CreateConsumerResponse();
        response.setPrefetchSize(consumer.getPrefetchSize());
        return response;
    }

    private CreateTemporaryQueueResponse processCreateTemporaryQueue(CreateTemporaryQueueQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        TemporaryQueue queue = session.createTemporaryQueue();
        CreateTemporaryQueueResponse response = new CreateTemporaryQueueResponse();
        response.setQueueName(queue.getQueueName());
        return response;
    }

    private CreateTemporaryTopicResponse processCreateTemporaryTopic(CreateTemporaryTopicQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        TemporaryTopic topic = session.createTemporaryTopic();
        CreateTemporaryTopicResponse response = new CreateTemporaryTopicResponse();
        response.setTopicName(topic.getTopicName());
        return response;
    }

    private DeleteTemporaryQueueResponse processDeleteTemporaryQueue(DeleteTemporaryQueueQuery query) throws JMSException {
        this.getLocalConnection().deleteTemporaryQueue(query.getQueueName());
        return new DeleteTemporaryQueueResponse();
    }

    private DeleteTemporaryTopicResponse processDeleteTemporaryTopic(DeleteTemporaryTopicQuery query) throws JMSException {
        this.getLocalConnection().deleteTemporaryTopic(query.getTopicName());
        return new DeleteTemporaryTopicResponse();
    }

    private OpenConnectionResponse processOpenConnection(OpenConnectionQuery query) throws JMSException {
        if (this.localConnection != null) {
            throw new FFMQException("Connection already established", "NETWORK_ERROR");
        }
        this.localConnection = (LocalConnection)this.engine.openConnection(query.getUserName(), query.getPassword(), query.getClientID());
        OpenConnectionResponse response = new OpenConnectionResponse();
        response.setProtocolVersion(9);
        return response;
    }

    private StartConnectionResponse processStartConnection() throws JMSException {
        this.getLocalConnection().start();
        return new StartConnectionResponse();
    }

    private StopConnectionResponse processStopConnection() throws JMSException {
        this.getLocalConnection().stop();
        return new StopConnectionResponse();
    }

    private SetClientIDResponse processSetClientID(SetClientIDQuery query) throws JMSException {
        this.getLocalConnection().setClientID(query.getClientID());
        return new SetClientIDResponse();
    }

    private CreateConsumerResponse processCreateDurableSubscriber(CreateDurableSubscriberQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        LocalDurableTopicSubscriber subscriber = (LocalDurableTopicSubscriber)session.createDurableSubscriber(query.getConsumerId(), query.getTopic(), query.getName(), query.getMessageSelector(), query.isNoLocal());
        subscriber.prefetchMore();
        CreateConsumerResponse response = new CreateConsumerResponse();
        response.setPrefetchSize(subscriber.getPrefetchSize());
        return response;
    }

    private UnsubscribeResponse processUnsubscribe(UnsubscribeQuery query) throws JMSException {
        LocalSession session = this.lookupSession((AbstractSessionQuery)query);
        session.unsubscribe(query.getSubscriptionName());
        return new UnsubscribeResponse();
    }

    private PingResponse processPing() throws JMSException {
        this.getLocalConnection();
        return new PingResponse();
    }
}

