package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.ConsumerGroupConfig;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.data2.transaction.queue.QueueScanner;
import co.cask.cdap.hbase.wd.DistributedScanner;
import com.google.common.base.Function;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Threads;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/SaltedHBaseQueueStrategy.class */
public final class SaltedHBaseQueueStrategy implements HBaseQueueStrategy {
    public static final int SALT_BYTES = 1;
    private static final Function<byte[], byte[]> ROW_KEY_CONVERTER = new Function<byte[], byte[]>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.SaltedHBaseQueueStrategy.1
        public byte[] apply(byte[] bArr) {
            return HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR.getOriginalKey(bArr);
        }
    };
    private final ExecutorService scansExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SaltedHBaseQueueStrategy() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("queue-consumer-scan"));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        this.scansExecutor = threadPoolExecutor;
    }

    @Override // co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueStrategy
    public QueueScanner createScanner(ConsumerConfig consumerConfig, HTable hTable, Scan scan, int i) throws IOException {
        return new HBaseQueueScanner(DistributedScanner.create(hTable, scan, HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR, this.scansExecutor), i, ROW_KEY_CONVERTER);
    }

    @Override // co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueStrategy
    public byte[] getActualRowKey(ConsumerConfig consumerConfig, byte[] bArr) {
        return HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR.getDistributedKey(bArr);
    }

    @Override // co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueStrategy
    public void getRowKeys(Iterable<ConsumerGroupConfig> iterable, QueueEntry queueEntry, byte[] bArr, long j, int i, Collection<byte[]> collection) {
        byte[] bArr2 = new byte[bArr.length + 8 + 4];
        Bytes.putBytes(bArr2, 0, bArr, 0, bArr.length);
        Bytes.putLong(bArr2, bArr.length, j);
        Bytes.putInt(bArr2, bArr2.length - 4, i);
        collection.add(HBaseQueueAdmin.ROW_KEY_DISTRIBUTOR.getDistributedKey(bArr2));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.scansExecutor.shutdownNow();
    }
}
