package co.cask.cdap.data2.transaction.messaging.coprocessor.hbase12cdh570;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.transaction.queue.hbase.coprocessor.CConfigurationReader;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HTable12CDH570NameConverter;
import co.cask.cdap.messaging.MessagingUtils;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;

/* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase12cdh570/PayloadTableRegionObserver.class */
public class PayloadTableRegionObserver extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(PayloadTableRegionObserver.class);
    private static final Gson GSON = new Gson();
    private static final Type MAP_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.data2.transaction.messaging.coprocessor.hbase12cdh570.PayloadTableRegionObserver.1
    }.getType();
    private static final byte[] COL_FAMILY = MessagingUtils.Constants.COLUMN_FAMILY;
    private static final byte[] COL = MessagingUtils.Constants.METADATA_COLUMN;
    private int prefixLength;
    private LoadingCache<ByteBuffer, Map<String, String>> topicCache;
    private HTableInterface metadataTable;
    private CConfigurationReader cConfReader;

    /* loaded from: input_file:co/cask/cdap/data2/transaction/messaging/coprocessor/hbase12cdh570/PayloadTableRegionObserver$PayloadDataFilter.class */
    private static final class PayloadDataFilter extends FilterBase {
        private final RegionCoprocessorEnvironment env;
        private final long timestamp;
        private final int prefixLength;
        private final LoadingCache<ByteBuffer, Map<String, String>> topicCache;
        private byte[] prevTopicIdBytes;
        private Long currentTTL;
        private Integer currentGen;

        PayloadDataFilter(RegionCoprocessorEnvironment regionCoprocessorEnvironment, long j, int i, LoadingCache<ByteBuffer, Map<String, String>> loadingCache) {
            this.env = regionCoprocessorEnvironment;
            this.timestamp = j;
            this.prefixLength = i;
            this.topicCache = loadingCache;
        }

        public Filter.ReturnCode filterKeyValue(Cell cell) throws IOException {
            int rowOffset = cell.getRowOffset() + this.prefixLength;
            int rowLength = cell.getRowLength() - this.prefixLength;
            long writeTimestamp = MessagingUtils.getWriteTimestamp(cell.getRowArray(), rowOffset, rowLength);
            int topicLengthPayloadEntry = MessagingUtils.getTopicLengthPayloadEntry(rowLength) - 4;
            int i = Bytes.toInt(cell.getRowArray(), rowOffset + topicLengthPayloadEntry);
            try {
                if (this.prevTopicIdBytes == null || this.currentTTL == null || this.currentGen == null || !Bytes.equals(this.prevTopicIdBytes, 0, this.prevTopicIdBytes.length, cell.getRowArray(), rowOffset, topicLengthPayloadEntry)) {
                    this.prevTopicIdBytes = Arrays.copyOfRange(cell.getRowArray(), rowOffset, rowOffset + topicLengthPayloadEntry);
                    Map map = (Map) this.topicCache.get(ByteBuffer.wrap(this.prevTopicIdBytes));
                    this.currentTTL = Long.valueOf(Long.parseLong((String) map.get("ttl")));
                    this.currentGen = Integer.valueOf(Integer.parseInt((String) map.get("generation")));
                }
            } catch (ExecutionException e) {
                PayloadTableRegionObserver.LOG.info("Region " + this.env.getRegionInfo().getRegionNameAsString() + ", exception whiletrying to fetch properties of topicId " + MessagingUtils.toTopicId(this.prevTopicIdBytes) + "\n" + e.getMessage());
                PayloadTableRegionObserver.LOG.debug("StackTrace: ", e);
            }
            if (MessagingUtils.isOlderGeneration(i, this.currentGen.intValue())) {
                return Filter.ReturnCode.SKIP;
            }
            if (i == this.currentGen.intValue() && this.timestamp - writeTimestamp > this.currentTTL.longValue()) {
                return Filter.ReturnCode.SKIP;
            }
            return Filter.ReturnCode.INCLUDE;
        }
    }

    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
            HTableDescriptor tableDesc = ((RegionCoprocessorEnvironment) coprocessorEnvironment).getRegion().getTableDesc();
            String value = tableDesc.getValue("cdap.messaging.metadata.hbase.namespace");
            String value2 = tableDesc.getValue("dataset.table.prefix");
            HTable12CDH570NameConverter hTable12CDH570NameConverter = new HTable12CDH570NameConverter();
            this.cConfReader = new CConfigurationReader(coprocessorEnvironment.getConfiguration(), hTable12CDH570NameConverter.getSysConfigTablePrefix(value2));
            CConfiguration read = this.cConfReader.read();
            String str = read.get("messaging.metadata.table.name");
            this.prefixLength = Integer.valueOf(tableDesc.getValue("cdap.messaging.table.prefix.num.bytes")).intValue();
            this.metadataTable = coprocessorEnvironment.getTable(hTable12CDH570NameConverter.toTableName(value2, TableId.from(value, str)));
            this.topicCache = CacheBuilder.newBuilder().expireAfterWrite(read.getLong("messaging.coprocessor.metadata.cache.expiration.seconds"), TimeUnit.SECONDS).maximumSize(1000L).build(new CacheLoader<ByteBuffer, Map<String, String>>() { // from class: co.cask.cdap.data2.transaction.messaging.coprocessor.hbase12cdh570.PayloadTableRegionObserver.2
                public Map<String, String> load(ByteBuffer byteBuffer) throws Exception {
                    Map<String, String> map = (Map) PayloadTableRegionObserver.GSON.fromJson(Bytes.toString(PayloadTableRegionObserver.this.metadataTable.get(new Get(byteBuffer.array().length == byteBuffer.remaining() ? byteBuffer.array() : Bytes.toBytes(byteBuffer))).getValue(PayloadTableRegionObserver.COL_FAMILY, PayloadTableRegionObserver.COL)), PayloadTableRegionObserver.MAP_TYPE);
                    map.put("ttl", Long.toString(TimeUnit.SECONDS.toMillis(Long.parseLong(map.get("ttl")))));
                    return map;
                }
            });
        }
    }

    public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, KeyValueScanner keyValueScanner, InternalScanner internalScanner) throws IOException {
        LOG.info("preFlush, filter using PayloadDataFilter");
        Scan scan = new Scan();
        scan.setFilter(new PayloadDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicCache));
        return new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(keyValueScanner), ScanType.COMPACT_DROP_DELETES, store.getSmallestReadPoint(), Long.MIN_VALUE);
    }

    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext, Store store, List<? extends KeyValueScanner> list, ScanType scanType, long j, InternalScanner internalScanner, CompactionRequest compactionRequest) throws IOException {
        LOG.info("preCompact, filter using PayloadDataFilter");
        Scan scan = new Scan();
        scan.setFilter(new PayloadDataFilter(observerContext.getEnvironment(), System.currentTimeMillis(), this.prefixLength, this.topicCache));
        return new StoreScanner(store, store.getScanInfo(), scan, list, scanType, store.getSmallestReadPoint(), j);
    }
}
