package co.cask.cdap.messaging.store.hbase;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.ProjectInfo;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HTableDescriptorBuilder;
import co.cask.cdap.hbase.wd.RowKeyDistributorByHashPrefix;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.store.MessageTable;
import co.cask.cdap.messaging.store.MetadataTable;
import co.cask.cdap.messaging.store.PayloadTable;
import co.cask.cdap.messaging.store.TableFactory;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/messaging/store/hbase/HBaseTableFactory.class */
public final class HBaseTableFactory implements TableFactory {
    public static final byte[] COLUMN_FAMILY = MessagingUtils.Constants.COLUMN_FAMILY;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseTableFactory.class);
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final HBaseTableUtil tableUtil;
    private final ExecutorService scanExecutor;
    private final LocationFactory locationFactory;

    @Inject
    HBaseTableFactory(CConfiguration cConfiguration, Configuration configuration, HBaseTableUtil hBaseTableUtil, LocationFactory locationFactory) {
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.tableUtil = hBaseTableUtil;
        this.locationFactory = locationFactory;
        this.scanExecutor = new ThreadPoolExecutor(0, cConfiguration.getInt("messaging.hbase.max.scan.threads"), 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.createDaemonThreadFactory("messaging-hbase-scanner-%d"), new RejectedExecutionHandler() { // from class: co.cask.cdap.messaging.store.hbase.HBaseTableFactory.1
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                HBaseTableFactory.LOG.info("No more threads in the HBase scan thread pool. Consider increase {}. Runnable from caller thread {}", "messaging.hbase.max.scan.threads", Thread.currentThread().getName());
                if (threadPoolExecutor.isShutdown()) {
                    return;
                }
                runnable.run();
            }
        });
    }

    @Override // co.cask.cdap.messaging.store.TableFactory
    public MetadataTable createMetadataTable(String str) throws IOException {
        TableId createHTableId = this.tableUtil.createHTableId(NamespaceId.SYSTEM, str);
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        Throwable th = null;
        try {
            this.tableUtil.createTableIfNotExists(hBaseAdmin, createHTableId, this.tableUtil.buildHTableDescriptor(createHTableId).addFamily(new HColumnDescriptor(COLUMN_FAMILY).setMaxVersions(1)).build());
            if (hBaseAdmin != null) {
                if (0 != 0) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            return new HBaseMetadataTable(this.tableUtil, this.tableUtil.createHTable(this.hConf, createHTableId), COLUMN_FAMILY, this.cConf.getInt("messaging.hbase.scan.cache.rows"));
        } catch (Throwable th3) {
            if (hBaseAdmin != null) {
                if (0 != 0) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            throw th3;
        }
    }

    @Override // co.cask.cdap.messaging.store.TableFactory
    public MessageTable createMessageTable(String str) throws IOException {
        TableId createHTableId = this.tableUtil.createHTableId(NamespaceId.SYSTEM, str);
        HTable createTableIfNotExists = createTableIfNotExists(createHTableId, this.cConf.getInt("messaging.message.table.hbase.splits"), this.tableUtil.getMessageTableRegionObserverClassForVersion());
        return new HBaseMessageTable(this.tableUtil, createTableIfNotExists, COLUMN_FAMILY, new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(getKeyDistributorBuckets(createTableIfNotExists, createHTableId))), this.scanExecutor, this.cConf.getInt("messaging.hbase.scan.cache.rows"));
    }

    @Override // co.cask.cdap.messaging.store.TableFactory
    public PayloadTable createPayloadTable(String str) throws IOException {
        TableId createHTableId = this.tableUtil.createHTableId(NamespaceId.SYSTEM, str);
        HTable createTableIfNotExists = createTableIfNotExists(createHTableId, this.cConf.getInt("messaging.payload.table.hbase.splits"), this.tableUtil.getPayloadTableRegionObserverClassForVersion());
        return new HBasePayloadTable(this.tableUtil, createTableIfNotExists, COLUMN_FAMILY, new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(getKeyDistributorBuckets(createTableIfNotExists, createHTableId))), this.scanExecutor, this.cConf.getInt("messaging.hbase.scan.cache.rows"));
    }

    public void upgradeMessageTable(String str) throws IOException {
        upgradeCoProcessor(this.tableUtil.createHTableId(NamespaceId.SYSTEM, str), this.tableUtil.getMessageTableRegionObserverClassForVersion());
    }

    public void upgradePayloadTable(String str) throws IOException {
        upgradeCoProcessor(this.tableUtil.createHTableId(NamespaceId.SYSTEM, str), this.tableUtil.getPayloadTableRegionObserverClassForVersion());
    }

    public void disableMessageTable(String str) throws IOException {
        TableId createHTableId = this.tableUtil.createHTableId(NamespaceId.SYSTEM, str);
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        Throwable th = null;
        try {
            try {
                disableTable(hBaseAdmin, createHTableId);
                if (hBaseAdmin != null) {
                    if (0 == 0) {
                        hBaseAdmin.close();
                        return;
                    }
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hBaseAdmin != null) {
                if (th != null) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            throw th4;
        }
    }

    public void disablePayloadTable(String str) throws IOException {
        TableId createHTableId = this.tableUtil.createHTableId(NamespaceId.SYSTEM, str);
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        Throwable th = null;
        try {
            try {
                disableTable(hBaseAdmin, createHTableId);
                if (hBaseAdmin != null) {
                    if (0 == 0) {
                        hBaseAdmin.close();
                        return;
                    }
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hBaseAdmin != null) {
                if (th != null) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            throw th4;
        }
    }

    private void disableTable(HBaseAdmin hBaseAdmin, TableId tableId) throws IOException {
        try {
            this.tableUtil.disableTable(hBaseAdmin, tableId);
            LOG.debug("TMS Table {} has been disabled", tableId);
        } catch (TableNotEnabledException e) {
            LOG.debug("TMS Table {} was already in disabled state.", tableId, e);
        } catch (TableNotFoundException e2) {
            LOG.debug("TMS Table {} was not found. Skipping disable.", tableId, e2);
        }
    }

    private void upgradeCoProcessor(TableId tableId, Class<? extends Coprocessor> cls) throws IOException {
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        Throwable th = null;
        try {
            if (!this.tableUtil.tableExists(hBaseAdmin, tableId)) {
                LOG.debug("TMS Table {} was not found. Skip upgrading coprocessor.", tableId);
                if (hBaseAdmin != null) {
                    if (0 == 0) {
                        hBaseAdmin.close();
                        return;
                    }
                    try {
                        hBaseAdmin.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            HTableDescriptor hTableDescriptor = this.tableUtil.getHTableDescriptor(hBaseAdmin, tableId);
            ProjectInfo.Version version = HBaseTableUtil.getVersion(hTableDescriptor);
            if (version.compareTo(ProjectInfo.getVersion()) >= 0) {
                LOG.info("Table '{}' has not changed and its version '{}' is same or greater than current CDAP version '{}'", new Object[]{tableId, version, ProjectInfo.getVersion()});
                if (hBaseAdmin != null) {
                    if (0 == 0) {
                        hBaseAdmin.close();
                        return;
                    }
                    try {
                        hBaseAdmin.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            }
            HTableDescriptorBuilder buildHTableDescriptor = this.tableUtil.buildHTableDescriptor(hTableDescriptor);
            Iterator it = HBaseTableUtil.getCoprocessorInfo(hTableDescriptor).entrySet().iterator();
            while (it.hasNext()) {
                buildHTableDescriptor.removeCoprocessor(((HBaseTableUtil.CoprocessorInfo) ((Map.Entry) it.next()).getValue()).getClassName());
            }
            addCoprocessor(cls, buildHTableDescriptor);
            HBaseTableUtil.setVersion(buildHTableDescriptor);
            HBaseTableUtil.setTablePrefix(buildHTableDescriptor, this.cConf);
            disableTable(hBaseAdmin, tableId);
            this.tableUtil.modifyTable(hBaseAdmin, buildHTableDescriptor.build());
            LOG.debug("Enabling table '{}'...", tableId);
            this.tableUtil.enableTable(hBaseAdmin, tableId);
            if (hBaseAdmin != null) {
                if (0 != 0) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            LOG.info("Table '{}' update completed.", tableId);
        } catch (Throwable th5) {
            if (hBaseAdmin != null) {
                if (0 != 0) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            throw th5;
        }
    }

    private HTable createTableIfNotExists(TableId tableId, int i, Class<? extends Coprocessor> cls) throws IOException {
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        Throwable th = null;
        try {
            if (this.tableUtil.tableExists(hBaseAdmin, tableId)) {
                HTable createHTable = this.tableUtil.createHTable(this.hConf, tableId);
                if (hBaseAdmin != null) {
                    if (0 != 0) {
                        try {
                            hBaseAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        hBaseAdmin.close();
                    }
                }
                return createHTable;
            }
            RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix = new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(i));
            HTableDescriptorBuilder value = this.tableUtil.buildHTableDescriptor(tableId).addFamily(new HColumnDescriptor(COLUMN_FAMILY).setMaxVersions(1)).setValue("cdap.messaging.table.prefix.num.bytes", Integer.toString(1)).setValue("cdap.messaging.key.distributor.buckets", Integer.toString(i)).setValue("cdap.messaging.metadata.hbase.namespace", this.tableUtil.createHTableId(NamespaceId.SYSTEM, this.cConf.get("messaging.metadata.table.name")).getNamespace());
            addCoprocessor(cls, value);
            this.tableUtil.createTableIfNotExists(hBaseAdmin, tableId, value.build(), HBaseTableUtil.getSplitKeys(i, i, rowKeyDistributorByHashPrefix));
            if (hBaseAdmin != null) {
                if (0 != 0) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            HTable createHTable2 = this.tableUtil.createHTable(this.hConf, tableId);
            createHTable2.setAutoFlushTo(false);
            return createHTable2;
        } catch (Throwable th4) {
            if (hBaseAdmin != null) {
                if (0 != 0) {
                    try {
                        hBaseAdmin.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hBaseAdmin.close();
                }
            }
            throw th4;
        }
    }

    private int getKeyDistributorBuckets(HTable hTable, TableId tableId) throws IOException {
        try {
            String value = hTable.getTableDescriptor().getValue("cdap.messaging.key.distributor.buckets");
            if (value == null) {
                throw new IOException("Missing table attribute cdap.messaging.key.distributor.buckets on HBase table " + tableId);
            }
            return Integer.parseInt(value);
        } catch (NumberFormatException e) {
            throw new IOException("Invalid value for table attribute cdap.messaging.key.distributor.buckets on HBase table " + tableId, e);
        }
    }

    private void addCoprocessor(Class<? extends Coprocessor> cls, HTableDescriptorBuilder hTableDescriptorBuilder) throws IOException {
        ImmutableList of = ImmutableList.of(cls);
        hTableDescriptorBuilder.addCoprocessor(cls.getName(), new Path(HBaseTableUtil.createCoProcessorJar(cls.getSimpleName(), this.locationFactory.create(this.cConf.get("messaging.coprocessor.dir")), of).toURI().getPath()), 1073741823, (Map) null);
    }
}
