package co.cask.cdap.data2.transaction.queue.inmemory;

import co.cask.cdap.common.utils.ImmutablePair;
import co.cask.cdap.data2.metadata.dataset.BusinessMetadataDataset;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.DequeueStrategy;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.data2.transaction.queue.ConsumerEntryState;
import co.cask.tephra.Transaction;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/inmemory/InMemoryQueue.class */
public class InMemoryQueue {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueue.class);
    private final ConcurrentNavigableMap<Key, Item> entries = new ConcurrentSkipListMap();

    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/inmemory/InMemoryQueue$ConsumerState.class */
    public static class ConsumerState {
        Key startKey = null;
    }

    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/inmemory/InMemoryQueue$Item.class */
    private static final class Item {
        final QueueEntry entry;
        ConcurrentMap<Long, ItemEntryState> consumerStates = Maps.newConcurrentMap();
        AtomicInteger processedCount = new AtomicInteger();

        Item(QueueEntry queueEntry) {
            this.entry = queueEntry;
        }

        ConsumerEntryState getConsumerState(long j) {
            ItemEntryState itemEntryState = this.consumerStates.get(Long.valueOf(j));
            if (itemEntryState == null) {
                return null;
            }
            return itemEntryState.getState();
        }

        void setConsumerState(ConsumerConfig consumerConfig, ConsumerEntryState consumerEntryState) {
            this.consumerStates.put(Long.valueOf(consumerConfig.getGroupId()), new ItemEntryState(consumerConfig.getInstanceId(), consumerEntryState));
        }

        void revokeConsumerState(ConsumerConfig consumerConfig, boolean z) {
            if (z) {
                this.consumerStates.put(Long.valueOf(consumerConfig.getGroupId()), new ItemEntryState(consumerConfig.getInstanceId(), ConsumerEntryState.CLAIMED));
            } else {
                this.consumerStates.remove(Long.valueOf(consumerConfig.getGroupId()));
            }
        }

        boolean claim(ConsumerConfig consumerConfig) {
            ItemEntryState itemEntryState = this.consumerStates.get(Long.valueOf(consumerConfig.getGroupId()));
            if (itemEntryState == null) {
                itemEntryState = this.consumerStates.putIfAbsent(Long.valueOf(consumerConfig.getGroupId()), new ItemEntryState(consumerConfig.getInstanceId(), ConsumerEntryState.CLAIMED));
                if (itemEntryState == null) {
                    return true;
                }
            }
            return itemEntryState.getInstanceId() >= consumerConfig.getGroupSize() || (itemEntryState.getInstanceId() == consumerConfig.getInstanceId() && itemEntryState.getState() == ConsumerEntryState.CLAIMED);
        }

        int incrementProcessed() {
            return this.processedCount.incrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/inmemory/InMemoryQueue$ItemEntryState.class */
    public static final class ItemEntryState {
        final int instanceId;
        ConsumerEntryState state;

        ItemEntryState(int i, ConsumerEntryState consumerEntryState) {
            this.instanceId = i;
            this.state = consumerEntryState;
        }

        int getInstanceId() {
            return this.instanceId;
        }

        ConsumerEntryState getState() {
            return this.state;
        }

        void setState(ConsumerEntryState consumerEntryState) {
            this.state = consumerEntryState;
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/inmemory/InMemoryQueue$Key.class */
    public static final class Key implements Comparable<Key> {
        final long txId;
        final int seqNo;

        Key(long j, int i) {
            this.txId = j;
            this.seqNo = i;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null || obj.getClass() != Key.class) {
                return false;
            }
            Key key = (Key) obj;
            return this.txId == key.txId && this.seqNo == key.seqNo;
        }

        public int hashCode() {
            return Objects.hashCode(new Object[]{Long.valueOf(this.txId), Integer.valueOf(this.seqNo)});
        }

        @Override // java.lang.Comparable
        public int compareTo(Key key) {
            if (this.txId != key.txId) {
                return this.txId < key.txId ? -1 : 1;
            }
            if (this.seqNo == key.seqNo) {
                return 0;
            }
            return this.seqNo < key.seqNo ? -1 : 1;
        }

        public String toString() {
            return this.txId + BusinessMetadataDataset.KEYVALUE_SEPARATOR + this.seqNo;
        }
    }

    public void clear() {
        this.entries.clear();
    }

    public int getSize() {
        return this.entries.size();
    }

    public void enqueue(long j, int i, QueueEntry queueEntry) {
        this.entries.put(new Key(j, i), new Item(queueEntry));
    }

    public void undoEnqueue(long j, int i) {
        this.entries.remove(new Key(j, i));
    }

    public ImmutablePair<List<Key>, List<byte[]>> dequeue(Transaction transaction, ConsumerConfig consumerConfig, ConsumerState consumerState, int i) {
        int intValue;
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(i);
        boolean z = true;
        for (Key key : consumerState.startKey == null ? this.entries.navigableKeySet() : this.entries.tailMap((ConcurrentNavigableMap<Key, Item>) consumerState.startKey).navigableKeySet()) {
            if (newArrayListWithCapacity.size() >= i) {
                break;
            }
            if (z && key.txId < transaction.getFirstShortInProgress()) {
                consumerState.startKey = key;
            }
            if (transaction.getReadPointer() < key.txId) {
                break;
            }
            if (transaction.isInProgress(key.txId)) {
                z = false;
            } else {
                Item item = (Item) this.entries.get(key);
                if (item != null) {
                    if (!ConsumerEntryState.PROCESSED.equals(item.getConsumerState(consumerConfig.getGroupId()))) {
                        if (consumerConfig.getDequeueStrategy().equals(DequeueStrategy.FIFO)) {
                            if (item.claim(consumerConfig)) {
                                newArrayListWithCapacity.add(key);
                                newArrayListWithCapacity2.add(item.entry.getData());
                            }
                            z = false;
                        } else if (consumerConfig.getGroupSize() == 1) {
                            newArrayListWithCapacity.add(key);
                            newArrayListWithCapacity2.add(item.entry.getData());
                            z = false;
                        } else {
                            if (consumerConfig.getDequeueStrategy().equals(DequeueStrategy.ROUND_ROBIN)) {
                                intValue = key.hashCode();
                            } else {
                                Integer hashKey = item.entry.getHashKey(consumerConfig.getHashKey());
                                intValue = hashKey == null ? 0 : hashKey.intValue();
                            }
                            if (Math.abs(intValue) % consumerConfig.getGroupSize() == consumerConfig.getInstanceId()) {
                                newArrayListWithCapacity.add(key);
                                newArrayListWithCapacity2.add(item.entry.getData());
                                z = false;
                            }
                        }
                    }
                }
            }
        }
        if (newArrayListWithCapacity.isEmpty()) {
            return null;
        }
        return ImmutablePair.of(newArrayListWithCapacity, newArrayListWithCapacity2);
    }

    public void ack(List<Key> list, ConsumerConfig consumerConfig) {
        if (list == null) {
            return;
        }
        for (Key key : list) {
            Item item = (Item) this.entries.get(key);
            if (item == null) {
                LOG.warn("Attempting to ack non-existing entry " + key);
            } else {
                item.setConsumerState(consumerConfig, ConsumerEntryState.PROCESSED);
            }
        }
    }

    public void undoDequeue(List<Key> list, ConsumerConfig consumerConfig) {
        if (list == null) {
            return;
        }
        for (Key key : list) {
            Item item = (Item) this.entries.get(key);
            if (item == null) {
                LOG.warn("Attempting to undo dequeue for non-existing entry " + key);
            } else {
                item.revokeConsumerState(consumerConfig, consumerConfig.getDequeueStrategy() == DequeueStrategy.FIFO);
            }
        }
    }

    public void evict(List<Key> list, int i) {
        if (i >= 1 && list != null) {
            for (Key key : list) {
                Item item = (Item) this.entries.get(key);
                if (item == null) {
                    LOG.warn("Attempting to evict non-existing entry " + key);
                } else if (item.incrementProcessed() >= i) {
                    this.entries.remove(key);
                }
            }
        }
    }
}
