package net.spy.memcached;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.spy.memcached.auth.AuthThread;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.compat.log.Logger;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.metrics.MetricCollector;
import net.spy.memcached.metrics.MetricType;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.NoopOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.TapOperation;
import net.spy.memcached.ops.VBucketAware;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.protocol.binary.MultiGetOperationImpl;
import net.spy.memcached.protocol.binary.TapAckOperationImpl;
import net.spy.memcached.util.StringUtils;
import org.springframework.util.SystemPropertyUtils;

/* loaded from: input_file:net/spy/memcached/MemcachedConnection.class */
public class MemcachedConnection extends SpyThread {
    private static final int DOUBLE_CHECK_EMPTY = 256;
    private static final int EXCESSIVE_EMPTY = 16777216;
    private static final int DEFAULT_WAKEUP_DELAY = 1000;
    private static final int DEFAULT_RETRY_QUEUE_SIZE = -1;
    private static final int MAX_CLONE_COUNT = 100;
    private static final String RECON_QUEUE_METRIC = "[MEM] Reconnecting Nodes (ReconnectQueue)";
    private static final String SHUTD_QUEUE_METRIC = "[MEM] Shutting Down Nodes (NodesToShutdown)";
    private static final String OVERALL_REQUEST_METRIC = "[MEM] Request Rate: All";
    private static final String OVERALL_AVG_BYTES_WRITE_METRIC = "[MEM] Average Bytes written to OS per write";
    private static final String OVERALL_AVG_BYTES_READ_METRIC = "[MEM] Average Bytes read from OS per read";
    private static final String OVERALL_AVG_TIME_ON_WIRE_METRIC = "[MEM] Average Time on wire for operations (Âµs)";
    private static final String OVERALL_RESPONSE_METRIC = "[MEM] Response Rate: All (Failure + Success + Retry)";
    private static final String OVERALL_RESPONSE_RETRY_METRIC = "[MEM] Response Rate: Retry";
    private static final String OVERALL_RESPONSE_FAIL_METRIC = "[MEM] Response Rate: Failure";
    private static final String OVERALL_RESPONSE_SUCC_METRIC = "[MEM] Response Rate: Success";
    private final boolean shouldOptimize;
    protected Selector selector;
    protected final NodeLocator locator;
    protected final FailureMode failureMode;
    private final long maxDelay;
    private final int bufSize;
    private final ConnectionFactory connectionFactory;
    protected final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
    private final SortedMap<Long, MemcachedNode> reconnectQueue;
    private final OperationFactory opFact;
    private final int timeoutExceptionThreshold;
    private final List<Operation> retryOps;
    protected final ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown;
    private final boolean verifyAliveOnConnect;
    private final ExecutorService listenerExecutorService;
    protected final MetricCollector metrics;
    protected final MetricType metricType;
    private final int wakeupDelay;
    private final int retryQueueSize;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected volatile boolean shutDown = false;
    private int emptySelects = 0;
    protected volatile boolean running = true;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue();

    public MemcachedConnection(int i, ConnectionFactory connectionFactory, List<InetSocketAddress> list, Collection<ConnectionObserver> collection, FailureMode failureMode, OperationFactory operationFactory) throws IOException {
        this.selector = null;
        this.connObservers.addAll(collection);
        this.reconnectQueue = new TreeMap();
        this.addedQueue = new ConcurrentLinkedQueue<>();
        this.failureMode = failureMode;
        this.shouldOptimize = connectionFactory.shouldOptimize();
        this.maxDelay = TimeUnit.SECONDS.toMillis(connectionFactory.getMaxReconnectDelay());
        this.opFact = operationFactory;
        this.timeoutExceptionThreshold = connectionFactory.getTimeoutExceptionThreshold();
        this.selector = Selector.open();
        this.retryOps = Collections.synchronizedList(new ArrayList());
        this.nodesToShutdown = new ConcurrentLinkedQueue<>();
        this.listenerExecutorService = connectionFactory.getListenerExecutorService();
        this.bufSize = i;
        this.connectionFactory = connectionFactory;
        String property = System.getProperty("net.spy.verifyAliveOnConnect");
        if (property == null || !property.equals("true")) {
            this.verifyAliveOnConnect = false;
        } else {
            this.verifyAliveOnConnect = true;
        }
        this.wakeupDelay = Integer.parseInt(System.getProperty("net.spy.wakeupDelay", Integer.toString(1000)));
        this.retryQueueSize = Integer.parseInt(System.getProperty("net.spy.retryQueueSize", Integer.toString(-1)));
        getLogger().info("Setting retryQueueSize to " + this.retryQueueSize);
        this.locator = connectionFactory.createLocator(createConnections(list));
        this.metrics = connectionFactory.getMetricCollector();
        this.metricType = connectionFactory.enableMetrics();
        registerMetrics();
        setName("Memcached IO over " + this);
        setDaemon(connectionFactory.isDaemon());
        start();
    }

    protected void registerMetrics() {
        if (this.metricType.equals(MetricType.DEBUG) || this.metricType.equals(MetricType.PERFORMANCE)) {
            this.metrics.addHistogram(OVERALL_AVG_BYTES_READ_METRIC);
            this.metrics.addHistogram(OVERALL_AVG_BYTES_WRITE_METRIC);
            this.metrics.addHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC);
            this.metrics.addMeter(OVERALL_RESPONSE_METRIC);
            this.metrics.addMeter(OVERALL_REQUEST_METRIC);
            if (this.metricType.equals(MetricType.DEBUG)) {
                this.metrics.addCounter(RECON_QUEUE_METRIC);
                this.metrics.addCounter(SHUTD_QUEUE_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_RETRY_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_SUCC_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_FAIL_METRIC);
            }
        }
    }

    protected List<MemcachedNode> createConnections(Collection<InetSocketAddress> collection) throws IOException {
        ArrayList arrayList = new ArrayList(collection.size());
        for (InetSocketAddress inetSocketAddress : collection) {
            SocketChannel open = SocketChannel.open();
            open.configureBlocking(false);
            MemcachedNode createMemcachedNode = this.connectionFactory.createMemcachedNode(inetSocketAddress, open, this.bufSize);
            createMemcachedNode.setConnection(this);
            int i = 0;
            open.socket().setTcpNoDelay(!this.connectionFactory.useNagleAlgorithm());
            try {
                if (open.connect(inetSocketAddress)) {
                    getLogger().info("Connected to %s immediately", createMemcachedNode);
                    connected(createMemcachedNode);
                } else {
                    getLogger().info("Added %s to connect queue", createMemcachedNode);
                    i = 8;
                }
                this.selector.wakeup();
                createMemcachedNode.setSk(open.register(this.selector, i, createMemcachedNode));
            } catch (SocketException e) {
                getLogger().warn("Socket error on initial connect", e);
                queueReconnect(createMemcachedNode);
            }
            if (!$assertionsDisabled && !open.isConnected() && createMemcachedNode.getSk().interestOps() != 8) {
                throw new AssertionError("Not connected, and not wanting to connect");
                break;
            }
            arrayList.add(createMemcachedNode);
        }
        return arrayList;
    }

    private boolean selectorsMakeSense() {
        for (MemcachedNode memcachedNode : this.locator.getAll()) {
            if (memcachedNode.getSk() != null && memcachedNode.getSk().isValid()) {
                if (memcachedNode.getChannel().isConnected()) {
                    int interestOps = memcachedNode.getSk().interestOps();
                    int i = memcachedNode.hasReadOp() ? 0 | 1 : 0;
                    if (memcachedNode.hasWriteOp()) {
                        i |= 4;
                    }
                    if (memcachedNode.getBytesRemainingToWrite() > 0) {
                        i |= 4;
                    }
                    if (!$assertionsDisabled && interestOps != i) {
                        throw new AssertionError("Invalid ops:  " + memcachedNode + ", expected " + i + ", got " + interestOps);
                    }
                } else {
                    int interestOps2 = memcachedNode.getSk().interestOps();
                    if (!$assertionsDisabled && interestOps2 != 8) {
                        throw new AssertionError("Not connected, and not watching for connect: " + interestOps2);
                    }
                }
            }
        }
        getLogger().debug("Checked the selectors.");
        return true;
    }

    public void handleIO() throws IOException {
        if (this.shutDown) {
            getLogger().debug("No IO while shut down.");
            return;
        }
        handleInputQueue();
        getLogger().debug("Done dealing with queue.");
        long j = this.wakeupDelay;
        if (!this.reconnectQueue.isEmpty()) {
            j = Math.max(this.reconnectQueue.firstKey().longValue() - System.currentTimeMillis(), 1L);
        }
        getLogger().debug("Selecting with delay of %sms", Long.valueOf(j));
        if (!$assertionsDisabled && !selectorsMakeSense()) {
            throw new AssertionError("Selectors don't make sense.");
        }
        int select = this.selector.select(j);
        if (this.shutDown) {
            return;
        }
        if (select == 0 && this.addedQueue.isEmpty()) {
            handleWokenUpSelector();
        } else if (this.selector.selectedKeys().isEmpty()) {
            handleEmptySelects();
        } else {
            getLogger().debug("Selected %d, selected %d keys", Integer.valueOf(select), Integer.valueOf(this.selector.selectedKeys().size()));
            this.emptySelects = 0;
            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
            while (it.hasNext()) {
                handleIO(it.next());
                it.remove();
            }
        }
        handleOperationalTasks();
    }

    protected void handleWokenUpSelector() {
    }

    private void handleOperationalTasks() throws IOException {
        checkPotentiallyTimedOutConnection();
        if (!this.shutDown && !this.reconnectQueue.isEmpty()) {
            attemptReconnects();
        }
        if (!this.retryOps.isEmpty()) {
            ArrayList arrayList = new ArrayList(this.retryOps);
            this.retryOps.clear();
            redistributeOperations(arrayList);
        }
        handleShutdownQueue();
    }

    private void handleEmptySelects() {
        getLogger().debug("No selectors ready, interrupted: %b", Boolean.valueOf(Thread.interrupted()));
        int i = this.emptySelects + 1;
        this.emptySelects = i;
        if (i > 256) {
            for (SelectionKey selectionKey : this.selector.keys()) {
                getLogger().debug("%s has %s, interested in %s", selectionKey, Integer.valueOf(selectionKey.readyOps()), Integer.valueOf(selectionKey.interestOps()));
                if (selectionKey.readyOps() != 0) {
                    getLogger().debug("%s has a ready op, handling IO", selectionKey);
                    handleIO(selectionKey);
                } else {
                    lostConnection((MemcachedNode) selectionKey.attachment());
                }
            }
            if (!$assertionsDisabled && this.emptySelects >= EXCESSIVE_EMPTY) {
                throw new AssertionError("Too many empty selects");
            }
        }
    }

    private void handleShutdownQueue() throws IOException {
        Iterator<MemcachedNode> it = this.nodesToShutdown.iterator();
        while (it.hasNext()) {
            MemcachedNode next = it.next();
            if (!this.addedQueue.contains(next)) {
                this.nodesToShutdown.remove(next);
                this.metrics.decrementCounter(SHUTD_QUEUE_METRIC);
                Collection<Operation> destroyInputQueue = next.destroyInputQueue();
                if (next.getChannel() != null) {
                    next.getChannel().close();
                    next.setSk(null);
                    if (next.getBytesRemainingToWrite() > 0) {
                        getLogger().warn("Shut down with %d bytes remaining to write", Integer.valueOf(next.getBytesRemainingToWrite()));
                    }
                    getLogger().debug("Shut down channel %s", next.getChannel());
                }
                redistributeOperations(destroyInputQueue);
            }
        }
    }

    private void checkPotentiallyTimedOutConnection() {
        boolean z = true;
        while (z) {
            try {
                for (SelectionKey selectionKey : this.selector.keys()) {
                    MemcachedNode memcachedNode = (MemcachedNode) selectionKey.attachment();
                    if (memcachedNode.getContinuousTimeout() > this.timeoutExceptionThreshold) {
                        getLogger().warn("%s exceeded continuous timeout threshold", selectionKey);
                        lostConnection(memcachedNode);
                    }
                }
                z = false;
            } catch (ConcurrentModificationException e) {
                getLogger().warn("Retrying selector keys after ConcurrentModificationException caught", e);
            }
        }
    }

    private void handleInputQueue() {
        if (this.addedQueue.isEmpty()) {
            return;
        }
        getLogger().debug("Handling queue");
        HashSet hashSet = new HashSet();
        HashSet<MemcachedNode> hashSet2 = new HashSet();
        while (true) {
            MemcachedNode poll = this.addedQueue.poll();
            if (poll == null) {
                break;
            } else {
                hashSet2.add(poll);
            }
        }
        for (MemcachedNode memcachedNode : hashSet2) {
            boolean z = false;
            if (!memcachedNode.isActive()) {
                hashSet.add(memcachedNode);
            } else if (memcachedNode.getCurrentWriteOp() != null) {
                z = true;
                getLogger().debug("Handling queued write %s", memcachedNode);
            }
            memcachedNode.copyInputQueue();
            if (z) {
                try {
                    if (memcachedNode.getWbuf().hasRemaining()) {
                        handleWrites(memcachedNode);
                    }
                } catch (IOException e) {
                    getLogger().warn("Exception handling write", e);
                    lostConnection(memcachedNode);
                }
            }
            memcachedNode.fixupOps();
        }
        this.addedQueue.addAll(hashSet);
    }

    public boolean addObserver(ConnectionObserver connectionObserver) {
        return this.connObservers.add(connectionObserver);
    }

    public boolean removeObserver(ConnectionObserver connectionObserver) {
        return this.connObservers.remove(connectionObserver);
    }

    private void connected(MemcachedNode memcachedNode) {
        if (!$assertionsDisabled && !memcachedNode.getChannel().isConnected()) {
            throw new AssertionError("Not connected.");
        }
        int reconnectCount = memcachedNode.getReconnectCount();
        memcachedNode.connected();
        Iterator<ConnectionObserver> it = this.connObservers.iterator();
        while (it.hasNext()) {
            it.next().connectionEstablished(memcachedNode.getSocketAddress(), reconnectCount);
        }
    }

    private void lostConnection(MemcachedNode memcachedNode) {
        queueReconnect(memcachedNode);
        Iterator<ConnectionObserver> it = this.connObservers.iterator();
        while (it.hasNext()) {
            it.next().connectionLost(memcachedNode.getSocketAddress());
        }
    }

    boolean belongsToCluster(MemcachedNode memcachedNode) {
        Iterator<MemcachedNode> it = this.locator.getAll().iterator();
        while (it.hasNext()) {
            if (it.next().getSocketAddress().equals(memcachedNode.getSocketAddress())) {
                return true;
            }
        }
        return false;
    }

    private void handleIO(SelectionKey selectionKey) {
        MemcachedNode memcachedNode = (MemcachedNode) selectionKey.attachment();
        try {
            getLogger().debug("Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)", selectionKey, Boolean.valueOf(selectionKey.isReadable()), Boolean.valueOf(selectionKey.isWritable()), Boolean.valueOf(selectionKey.isConnectable()), selectionKey.attachment());
            if (selectionKey.isConnectable() && belongsToCluster(memcachedNode)) {
                getLogger().debug("Connection state changed for %s", selectionKey);
                SocketChannel channel = memcachedNode.getChannel();
                if (channel.finishConnect()) {
                    finishConnect(selectionKey, memcachedNode);
                } else if (!$assertionsDisabled && channel.isConnected()) {
                    throw new AssertionError("connected");
                }
            } else {
                handleReadsAndWrites(selectionKey, memcachedNode);
            }
        } catch (ConnectException e) {
            getLogger().info("Reconnecting due to failure to connect to %s", memcachedNode, e);
            queueReconnect(memcachedNode);
        } catch (ClosedChannelException e2) {
            if (!this.shutDown) {
                getLogger().info("Closed channel and not shutting down. Queueing reconnect on %s", memcachedNode, e2);
                lostConnection(memcachedNode);
            }
        } catch (OperationException e3) {
            memcachedNode.setupForAuth();
            getLogger().info("Reconnection due to exception handling a memcached operation on %s. This may be due to an authentication failure.", memcachedNode, e3);
            lostConnection(memcachedNode);
        } catch (Exception e4) {
            memcachedNode.setupForAuth();
            getLogger().info("Reconnecting due to exception on %s", memcachedNode, e4);
            lostConnection(memcachedNode);
        }
        memcachedNode.fixupOps();
    }

    private void handleReadsAndWrites(SelectionKey selectionKey, MemcachedNode memcachedNode) throws IOException {
        if (selectionKey.isValid() && selectionKey.isReadable()) {
            handleReads(memcachedNode);
        }
        if (selectionKey.isValid() && selectionKey.isWritable()) {
            handleWrites(memcachedNode);
        }
    }

    private void finishConnect(SelectionKey selectionKey, MemcachedNode memcachedNode) throws IOException {
        if (this.verifyAliveOnConnect) {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final OperationFuture operationFuture = new OperationFuture("noop", countDownLatch, DefaultConnectionFactory.DEFAULT_OPERATION_TIMEOUT, this.listenerExecutorService);
            NoopOperation noop = this.opFact.noop(new OperationCallback() { // from class: net.spy.memcached.MemcachedConnection.1
                @Override // net.spy.memcached.ops.OperationCallback
                public void receivedStatus(OperationStatus operationStatus) {
                    operationFuture.set(Boolean.valueOf(operationStatus.isSuccess()), operationStatus);
                }

                @Override // net.spy.memcached.ops.OperationCallback
                public void complete() {
                    countDownLatch.countDown();
                }
            });
            noop.setHandlingNode(memcachedNode);
            noop.initialize();
            checkState();
            insertOperation(memcachedNode, noop);
            memcachedNode.copyInputQueue();
            boolean z = false;
            if (selectionKey.isValid()) {
                long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(this.connectionFactory.getOperationTimeout());
                while (nanoTime > System.nanoTime()) {
                    handleWrites(memcachedNode);
                    handleReads(memcachedNode);
                    boolean z2 = countDownLatch.getCount() == 0;
                    z = z2;
                    if (z2) {
                        break;
                    }
                }
            }
            if (!z || noop.isCancelled() || noop.hasErrored() || noop.isTimedOut()) {
                throw new ConnectException("Could not send noop upon connect! This may indicate a running, but not responding memcached instance.");
            }
        }
        connected(memcachedNode);
        this.addedQueue.offer(memcachedNode);
        if (memcachedNode.getWbuf().hasRemaining()) {
            handleWrites(memcachedNode);
        }
    }

    private void handleWrites(MemcachedNode memcachedNode) throws IOException {
        memcachedNode.fillWriteBuffer(this.shouldOptimize);
        boolean z = memcachedNode.getBytesRemainingToWrite() > 0;
        while (z) {
            int writeSome = memcachedNode.writeSome();
            this.metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, writeSome);
            memcachedNode.fillWriteBuffer(this.shouldOptimize);
            z = writeSome > 0 && memcachedNode.getBytesRemainingToWrite() > 0;
        }
    }

    private void handleReads(MemcachedNode memcachedNode) throws IOException {
        Operation currentReadOp = memcachedNode.getCurrentReadOp();
        if (currentReadOp instanceof TapAckOperationImpl) {
            memcachedNode.removeCurrentReadOp();
            return;
        }
        ByteBuffer rbuf = memcachedNode.getRbuf();
        SocketChannel channel = memcachedNode.getChannel();
        int read = channel.read(rbuf);
        this.metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read);
        if (read < 0) {
            currentReadOp = handleReadsWhenChannelEndOfStream(currentReadOp, memcachedNode, rbuf);
        }
        while (read > 0) {
            getLogger().debug("Read %d bytes", Integer.valueOf(read));
            rbuf.flip();
            while (rbuf.remaining() > 0) {
                if (currentReadOp == null) {
                    throw new IllegalStateException("No read operation.");
                }
                this.metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC, (int) ((System.nanoTime() - currentReadOp.getWriteCompleteTimestamp()) / 1000));
                this.metrics.markMeter(OVERALL_RESPONSE_METRIC);
                synchronized (currentReadOp) {
                    readBufferAndLogMetrics(currentReadOp, rbuf, memcachedNode);
                }
                currentReadOp = memcachedNode.getCurrentReadOp();
            }
            rbuf.clear();
            read = channel.read(rbuf);
            memcachedNode.completedRead();
        }
    }

    private void readBufferAndLogMetrics(Operation operation, ByteBuffer byteBuffer, MemcachedNode memcachedNode) throws IOException {
        operation.readFromBuffer(byteBuffer);
        if (operation.getState() == OperationState.COMPLETE) {
            getLogger().debug("Completed read op: %s and giving the next %d bytes", operation, Integer.valueOf(byteBuffer.remaining()));
            Operation removeCurrentReadOp = memcachedNode.removeCurrentReadOp();
            if (!$assertionsDisabled && removeCurrentReadOp != operation) {
                throw new AssertionError("Expected to pop " + operation + " got " + removeCurrentReadOp);
            }
            if (removeCurrentReadOp.hasErrored()) {
                this.metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC);
                return;
            } else {
                this.metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC);
                return;
            }
        }
        if (operation.getState() == OperationState.RETRY) {
            handleRetryInformation(operation.getErrorMsg());
            getLogger().debug("Reschedule read op due to NOT_MY_VBUCKET error: %s ", operation);
            ((VBucketAware) operation).addNotMyVbucketNode(operation.getHandlingNode());
            Operation removeCurrentReadOp2 = memcachedNode.removeCurrentReadOp();
            if (!$assertionsDisabled && removeCurrentReadOp2 != operation) {
                throw new AssertionError("Expected to pop " + operation + " got " + removeCurrentReadOp2);
            }
            retryOperation(operation);
            this.metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC);
        }
    }

    private Operation handleReadsWhenChannelEndOfStream(Operation operation, MemcachedNode memcachedNode, ByteBuffer byteBuffer) throws IOException {
        if (!(operation instanceof TapOperation)) {
            throw new IOException("Disconnected unexpected, will reconnect.");
        }
        operation.getCallback().complete();
        ((TapOperation) operation).streamClosed(OperationState.COMPLETE);
        getLogger().debug("Completed read op: %s and giving the next %d bytes", operation, Integer.valueOf(byteBuffer.remaining()));
        Operation removeCurrentReadOp = memcachedNode.removeCurrentReadOp();
        if ($assertionsDisabled || removeCurrentReadOp == operation) {
            return memcachedNode.getCurrentReadOp();
        }
        throw new AssertionError("Expected to pop " + operation + " got " + removeCurrentReadOp);
    }

    static String dbgBuffer(ByteBuffer byteBuffer, int i) {
        StringBuilder sb = new StringBuilder();
        byte[] array = byteBuffer.array();
        for (int i2 = 0; i2 < i; i2++) {
            char c = (char) array[i2];
            if (Character.isWhitespace(c) || Character.isLetterOrDigit(c)) {
                sb.append(c);
            } else {
                sb.append("\\x");
                sb.append(Integer.toHexString(array[i2] & 255));
            }
        }
        return sb.toString();
    }

    protected void handleRetryInformation(byte[] bArr) {
        getLogger().debug("Got RETRY message: " + new String(bArr) + ", but not handled.");
    }

    protected void queueReconnect(MemcachedNode memcachedNode) {
        long j;
        if (this.shutDown) {
            return;
        }
        getLogger().warn("Closing, and reopening %s, attempt %d.", memcachedNode, Integer.valueOf(memcachedNode.getReconnectCount()));
        if (memcachedNode.getSk() != null) {
            memcachedNode.getSk().cancel();
            if (!$assertionsDisabled && memcachedNode.getSk().isValid()) {
                throw new AssertionError("Cancelled selection key is valid");
            }
        }
        memcachedNode.reconnecting();
        try {
            if (memcachedNode.getChannel() == null || memcachedNode.getChannel().socket() == null) {
                getLogger().info("The channel or socket was null for %s", memcachedNode);
            } else {
                memcachedNode.getChannel().socket().close();
            }
        } catch (IOException e) {
            getLogger().warn("IOException trying to close a socket", e);
        }
        memcachedNode.setChannel(null);
        long min = (long) Math.min(this.maxDelay, Math.pow(2.0d, memcachedNode.getReconnectCount()) * 1000.0d);
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = min;
        while (true) {
            j = currentTimeMillis + j2;
            if (!this.reconnectQueue.containsKey(Long.valueOf(j))) {
                break;
            }
            currentTimeMillis = j;
            j2 = 1;
        }
        this.reconnectQueue.put(Long.valueOf(j), memcachedNode);
        this.metrics.incrementCounter(RECON_QUEUE_METRIC);
        memcachedNode.setupResend();
        if (this.failureMode == FailureMode.Redistribute) {
            redistributeOperations(memcachedNode.destroyInputQueue());
        } else if (this.failureMode == FailureMode.Cancel) {
            cancelOperations(memcachedNode.destroyInputQueue());
        }
    }

    private void cancelOperations(Collection<Operation> collection) {
        Iterator<Operation> it = collection.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    public void redistributeOperations(Collection<Operation> collection) {
        Iterator<Operation> it = collection.iterator();
        while (it.hasNext()) {
            redistributeOperation(it.next());
        }
    }

    public void redistributeOperation(Operation operation) {
        if (operation.isCancelled() || operation.isTimedOut()) {
            return;
        }
        if (operation.getCloneCount() >= 100) {
            getLogger().warn("Cancelling operation " + operation + "because it has been retried (cloned) more than 100times.");
            operation.cancel();
            return;
        }
        if (operation.getState() == OperationState.WRITE_QUEUED && operation.getHandlingNode() != null) {
            addOperation(operation.getHandlingNode(), operation);
            return;
        }
        if (operation instanceof MultiGetOperationImpl) {
            for (String str : ((MultiGetOperationImpl) operation).getRetryKeys()) {
                addOperation(str, this.opFact.get(str, (GetOperation.Callback) operation.getCallback()));
            }
            return;
        }
        if (!(operation instanceof KeyedOperation)) {
            operation.cancel();
            return;
        }
        int i = 0;
        for (Operation operation2 : this.opFact.clone((KeyedOperation) operation)) {
            if (operation2 instanceof KeyedOperation) {
                Iterator<String> it = ((KeyedOperation) operation2).getKeys().iterator();
                while (it.hasNext()) {
                    addOperation(it.next(), operation2);
                    operation.addClone(operation2);
                    operation2.setCloneCount(operation.getCloneCount() + 1);
                }
            } else {
                operation2.cancel();
                getLogger().warn("Could not redistribute cloned non-keyed operation", operation2);
            }
            i++;
        }
        if (!$assertionsDisabled && i <= 0) {
            throw new AssertionError("Didn't add any new operations when redistributing");
        }
    }

    private void attemptReconnects() {
        long currentTimeMillis = System.currentTimeMillis();
        IdentityHashMap identityHashMap = new IdentityHashMap();
        ArrayList arrayList = new ArrayList();
        SocketChannel socketChannel = null;
        Iterator<MemcachedNode> it = this.reconnectQueue.headMap(Long.valueOf(currentTimeMillis)).values().iterator();
        while (it.hasNext()) {
            MemcachedNode next = it.next();
            it.remove();
            this.metrics.decrementCounter(RECON_QUEUE_METRIC);
            try {
                try {
                    if (belongsToCluster(next)) {
                        if (identityHashMap.containsKey(next)) {
                            getLogger().debug("Skipping duplicate reconnect request for %s", next);
                            socketChannel = socketChannel;
                        } else {
                            identityHashMap.put(next, Boolean.TRUE);
                            getLogger().info("Reconnecting %s", next);
                            SocketChannel open = SocketChannel.open();
                            open.configureBlocking(false);
                            open.socket().setTcpNoDelay(!this.connectionFactory.useNagleAlgorithm());
                            int i = 0;
                            if (open.connect(next.getSocketAddress())) {
                                connected(next);
                                this.addedQueue.offer(next);
                                getLogger().info("Immediately reconnected to %s", next);
                                if (!$assertionsDisabled && !open.isConnected()) {
                                    throw new AssertionError();
                                }
                            } else {
                                i = 8;
                            }
                            next.registerChannel(open, open.register(this.selector, i, next));
                            if (!$assertionsDisabled && next.getChannel() != open) {
                                throw new AssertionError("Channel was lost.");
                            }
                            socketChannel = open;
                        }
                        potentiallyCloseLeakingChannel(socketChannel, next);
                    } else {
                        getLogger().debug("Node does not belong to cluster anymore, skipping reconnect: %s", next);
                        potentiallyCloseLeakingChannel(socketChannel, next);
                    }
                } catch (SocketException e) {
                    getLogger().warn("Error on reconnect", e);
                    arrayList.add(next);
                    potentiallyCloseLeakingChannel(socketChannel, next);
                } catch (Exception e2) {
                    getLogger().error("Exception on reconnect, lost node %s", next, e2);
                    potentiallyCloseLeakingChannel(socketChannel, next);
                }
            } catch (Throwable th) {
                potentiallyCloseLeakingChannel(socketChannel, next);
                throw th;
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            queueReconnect((MemcachedNode) it2.next());
        }
    }

    private void potentiallyCloseLeakingChannel(SocketChannel socketChannel, MemcachedNode memcachedNode) {
        if (socketChannel == null || socketChannel.isConnected() || socketChannel.isConnectionPending()) {
            return;
        }
        try {
            socketChannel.close();
        } catch (IOException e) {
            getLogger().error("Exception closing channel: %s", memcachedNode, e);
        }
    }

    public NodeLocator getLocator() {
        return this.locator;
    }

    public void enqueueOperation(String str, Operation operation) {
        checkState();
        StringUtils.validateKey(str, this.opFact instanceof BinaryOperationFactory);
        addOperation(str, operation);
    }

    protected void addOperation(String str, Operation operation) {
        MemcachedNode memcachedNode = null;
        MemcachedNode primary = this.locator.getPrimary(str);
        if (primary.isActive() || this.failureMode == FailureMode.Retry) {
            memcachedNode = primary;
        } else if (this.failureMode == FailureMode.Cancel) {
            operation.cancel();
        } else {
            Iterator<MemcachedNode> sequence = this.locator.getSequence(str);
            while (memcachedNode == null && sequence.hasNext()) {
                MemcachedNode next = sequence.next();
                if (next.isActive()) {
                    memcachedNode = next;
                }
            }
            if (memcachedNode == null) {
                memcachedNode = primary;
                getLogger().warn("Could not redistribute to another node, retrying primary node for %s.", str);
            }
        }
        if (!$assertionsDisabled && !operation.isCancelled() && memcachedNode == null) {
            throw new AssertionError("No node found for key " + str);
        }
        if (memcachedNode != null) {
            addOperation(memcachedNode, operation);
        } else if (!$assertionsDisabled && !operation.isCancelled()) {
            throw new AssertionError("No node found for " + str + " (and not immediately cancelled)");
        }
    }

    public void insertOperation(MemcachedNode memcachedNode, Operation operation) {
        operation.setHandlingNode(memcachedNode);
        operation.initialize();
        memcachedNode.insertOp(operation);
        this.addedQueue.offer(memcachedNode);
        this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        Selector wakeup = this.selector.wakeup();
        if (!$assertionsDisabled && wakeup != this.selector) {
            throw new AssertionError("Wakeup returned the wrong selector.");
        }
        getLogger().debug("Added %s to %s", operation, memcachedNode);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addOperation(MemcachedNode memcachedNode, Operation operation) {
        if (!memcachedNode.isAuthenticated()) {
            retryOperation(operation);
            return;
        }
        operation.setHandlingNode(memcachedNode);
        operation.initialize();
        memcachedNode.addOp(operation);
        this.addedQueue.offer(memcachedNode);
        this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        Selector wakeup = this.selector.wakeup();
        if (!$assertionsDisabled && wakeup != this.selector) {
            throw new AssertionError("Wakeup returned the wrong selector.");
        }
        getLogger().debug("Added %s to %s", operation, memcachedNode);
    }

    public void addOperations(Map<MemcachedNode, Operation> map) {
        for (Map.Entry<MemcachedNode, Operation> entry : map.entrySet()) {
            addOperation(entry.getKey(), entry.getValue());
        }
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory broadcastOpFactory) {
        return broadcastOperation(broadcastOpFactory, this.locator.getAll());
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory broadcastOpFactory, Collection<MemcachedNode> collection) {
        CountDownLatch countDownLatch = new CountDownLatch(collection.size());
        for (MemcachedNode memcachedNode : collection) {
            getLogger().debug("broadcast Operation: node = " + memcachedNode);
            Operation newOp = broadcastOpFactory.newOp(memcachedNode, countDownLatch);
            newOp.initialize();
            memcachedNode.addOp(newOp);
            newOp.setHandlingNode(memcachedNode);
            this.addedQueue.offer(memcachedNode);
            this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        }
        Selector wakeup = this.selector.wakeup();
        if ($assertionsDisabled || wakeup == this.selector) {
            return countDownLatch;
        }
        throw new AssertionError("Wakeup returned the wrong selector.");
    }

    public void shutdown() throws IOException {
        this.shutDown = true;
        try {
            Selector wakeup = this.selector.wakeup();
            if (!$assertionsDisabled && wakeup != this.selector) {
                throw new AssertionError("Wakeup returned the wrong selector.");
            }
            for (MemcachedNode memcachedNode : this.locator.getAll()) {
                if (memcachedNode.getChannel() != null) {
                    memcachedNode.getChannel().close();
                    memcachedNode.setSk(null);
                    if (memcachedNode.getBytesRemainingToWrite() > 0) {
                        getLogger().warn("Shut down with %d bytes remaining to write", Integer.valueOf(memcachedNode.getBytesRemainingToWrite()));
                    }
                    getLogger().debug("Shut down channel %s", memcachedNode.getChannel());
                }
            }
            this.selector.close();
            getLogger().debug("Shut down selector %s", this.selector);
            this.running = false;
        } catch (Throwable th) {
            this.running = false;
            throw th;
        }
    }

    @Override // java.lang.Thread
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{MemcachedConnection to");
        Iterator<MemcachedNode> it = this.locator.getAll().iterator();
        while (it.hasNext()) {
            sb.append(AuthThread.MECH_SEPARATOR).append(it.next().getSocketAddress());
        }
        sb.append(SystemPropertyUtils.PLACEHOLDER_SUFFIX);
        return sb.toString();
    }

    public String connectionsStatus() {
        StringBuilder sb = new StringBuilder();
        sb.append("Connection Status {");
        for (MemcachedNode memcachedNode : this.locator.getAll()) {
            sb.append(AuthThread.MECH_SEPARATOR).append(memcachedNode.getSocketAddress()).append(" active: ").append(memcachedNode.isActive()).append(", authed: ").append(memcachedNode.isAuthenticated()).append(MessageFormat.format(", last read: {0} ms ago", Long.valueOf(memcachedNode.lastReadDelta())));
        }
        sb.append(" }");
        return sb.toString();
    }

    public static void opTimedOut(Operation operation) {
        setTimeout(operation, true);
    }

    public static void opSucceeded(Operation operation) {
        setTimeout(operation, false);
    }

    private static void setTimeout(Operation operation, boolean z) {
        Logger logger = LoggerFactory.getLogger((Class<?>) MemcachedConnection.class);
        if (operation != null) {
            try {
                if (operation.isTimedOutUnsent()) {
                    return;
                }
                MemcachedNode handlingNode = operation.getHandlingNode();
                if (handlingNode != null) {
                    handlingNode.setContinuousTimeout(z);
                }
            } catch (Exception e) {
                logger.error(e.getMessage());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkState() {
        if (this.shutDown) {
            throw new IllegalStateException("Shutting down");
        }
        if (!$assertionsDisabled && !isAlive()) {
            throw new AssertionError("IO Thread is not running.");
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                handleIO();
            } catch (IOException e) {
                logRunException(e);
            } catch (CancelledKeyException e2) {
                logRunException(e2);
            } catch (ClosedSelectorException e3) {
                logRunException(e3);
            } catch (IllegalStateException e4) {
                logRunException(e4);
            } catch (ConcurrentModificationException e5) {
                logRunException(e5);
            }
        }
        getLogger().info("Shut down memcached client");
    }

    private void logRunException(Exception exc) {
        if (this.shutDown) {
            getLogger().debug("Exception occurred during shutdown", exc);
        } else {
            getLogger().warn("Problem handling memcached IO", exc);
        }
    }

    public boolean isShutDown() {
        return this.shutDown;
    }

    public void retryOperation(Operation operation) {
        if (this.retryQueueSize >= 0 && this.retryOps.size() >= this.retryQueueSize && !operation.isCancelled()) {
            operation.cancel();
        }
        this.retryOps.add(operation);
    }

    static {
        $assertionsDisabled = !MemcachedConnection.class.desiredAssertionStatus();
    }
}
