package co.cask.cdap.data2.dataset2.lib.table;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.table.Get;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.tephra.DefaultTransactionExecutor;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionConflictException;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/table/TableConcurrentTest.class */
public abstract class TableConcurrentTest<T extends Table> extends TableTest<T> {
    private static final Logger LOG = LoggerFactory.getLogger(TableConcurrentTest.class);
    private static final byte[] ROW_TO_INCREMENT = Bytes.toBytes("row_to_increment");
    private static final byte[] COLUMN_TO_INCREMENT = Bytes.toBytes("column_to_increment");
    private static final byte[][] ROWS_TO_APPEND_TO = new byte[6];
    protected TransactionExecutorFactory txExecutorFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/table/TableConcurrentTest$AppendingClient.class */
    public class AppendingClient implements Runnable {
        private final TransactionExecutorFactory txExecutorFactory;
        private final T table;

        public AppendingClient(TransactionExecutorFactory transactionExecutorFactory) throws Exception {
            this.txExecutorFactory = transactionExecutorFactory;
            this.table = (T) TableConcurrentTest.this.getTable(TableTest.CONTEXT1, "myTable");
        }

        @Override // java.lang.Runnable
        public void run() {
            TransactionExecutor createExecutor = this.txExecutorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{(TransactionAware) this.table}));
            for (int i = 0; i < 100; i++) {
                for (int i2 = 0; i2 < TableConcurrentTest.ROWS_TO_APPEND_TO.length / 2; i2++) {
                    final ImmutableList of = ImmutableList.of(new Get(TableConcurrentTest.ROWS_TO_APPEND_TO[i2 * 2]), new Get(TableConcurrentTest.ROWS_TO_APPEND_TO[(i2 * 2) + 1]));
                    boolean z = false;
                    while (!z) {
                        try {
                            createExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.table.TableConcurrentTest.AppendingClient.1
                                public void apply() throws Exception {
                                    List list = AppendingClient.this.table.get(of);
                                    for (int i3 = 0; i3 < of.size(); i3++) {
                                        appendColumn(((Get) of.get(i3)).getRow(), ((Row) list.get(i3)).getColumns());
                                    }
                                }

                                /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
                                /* JADX WARN: Type inference failed for: r3v2, types: [byte[], byte[][]] */
                                private void appendColumn(byte[] bArr, Map<byte[], byte[]> map) throws Exception {
                                    int size = map.isEmpty() ? 0 : !map.containsKey(TableConcurrentTest.COLUMN_TO_INCREMENT) ? map.size() : map.size() - 1;
                                    AppendingClient.this.table.put(bArr, (byte[][]) new byte[]{Bytes.toBytes("column" + size)}, (byte[][]) new byte[]{Bytes.toBytes("foo" + size)});
                                }
                            });
                            z = true;
                        } catch (TransactionConflictException e) {
                            z = false;
                        } catch (Throwable th) {
                            TableConcurrentTest.LOG.warn("failed to append, bailing out", th);
                            throw Throwables.propagate(th);
                        }
                    }
                }
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/table/TableConcurrentTest$CreateThread.class */
    class CreateThread extends Thread {
        private final AtomicBoolean success;

        CreateThread(AtomicBoolean atomicBoolean) {
            this.success = atomicBoolean;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.success.set(false);
                TableConcurrentTest.this.getTableAdmin(TableTest.CONTEXT1, "conccreate").create();
                this.success.set(true);
            } catch (Throwable th) {
                this.success.set(false);
                th.printStackTrace(System.err);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/table/TableConcurrentTest$IncrementingClient.class */
    private class IncrementingClient implements Runnable {
        private final TransactionExecutorFactory txExecutorFactory;
        private final T table;

        public IncrementingClient(TransactionExecutorFactory transactionExecutorFactory) throws Exception {
            this.txExecutorFactory = transactionExecutorFactory;
            this.table = (T) TableConcurrentTest.this.getTable(TableTest.CONTEXT1, "myTable");
        }

        @Override // java.lang.Runnable
        public void run() {
            final int[] iArr = {0};
            while (iArr[0] < 100) {
                try {
                    this.txExecutorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{(TransactionAware) this.table})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.table.TableConcurrentTest.IncrementingClient.1
                        /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
                        public void apply() throws Exception {
                            IncrementingClient.this.table.incrementAndGet(TableConcurrentTest.ROW_TO_INCREMENT, (byte[][]) new byte[]{TableConcurrentTest.COLUMN_TO_INCREMENT}, new long[]{iArr[0]});
                        }
                    });
                    iArr[0] = iArr[0] + 1;
                } catch (TransactionConflictException e) {
                } catch (Throwable th) {
                    TableConcurrentTest.LOG.warn("failed to increment, bailing out", th);
                    throw Throwables.propagate(th);
                }
            }
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.TableTest
    @Before
    public void before() {
        super.before();
        this.txExecutorFactory = new TransactionExecutorFactory() { // from class: co.cask.cdap.data2.dataset2.lib.table.TableConcurrentTest.1
            public TransactionExecutor createExecutor(Iterable<TransactionAware> iterable) {
                return new DefaultTransactionExecutor(TableConcurrentTest.this.txClient, iterable);
            }
        };
    }

    @Test(timeout = 120000)
    public void testConcurrentOnSingleTable() throws Exception {
        getTableAdmin(CONTEXT1, "myTable").create();
        try {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 5; i++) {
                newFixedThreadPool.submit(new IncrementingClient(this.txExecutorFactory));
                newFixedThreadPool.submit(new AppendingClient(this.txExecutorFactory));
            }
            newFixedThreadPool.shutdown();
            newFixedThreadPool.awaitTermination(2L, TimeUnit.MINUTES);
            final TransactionAware table = getTable(CONTEXT1, "myTable");
            this.txExecutorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{table})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.dataset2.lib.table.TableConcurrentTest.2
                public void apply() throws Exception {
                    verifyIncrements();
                    verifyAppends();
                }

                private void verifyAppends() throws Exception {
                    for (byte[] bArr : TableConcurrentTest.ROWS_TO_APPEND_TO) {
                        Map columns = table.get(bArr).getColumns();
                        Assert.assertFalse(columns.isEmpty());
                        Assert.assertEquals(500 + (Arrays.equals(TableConcurrentTest.ROW_TO_INCREMENT, bArr) ? 1 : 0), columns.size());
                        for (int i2 = 0; i2 < 500; i2++) {
                            Assert.assertArrayEquals(Bytes.toBytes("foo" + i2), (byte[]) columns.get(Bytes.toBytes("column" + i2)));
                        }
                    }
                }

                /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
                private void verifyIncrements() throws Exception {
                    Map columns = table.get(TableConcurrentTest.ROW_TO_INCREMENT, (byte[][]) new byte[]{TableConcurrentTest.COLUMN_TO_INCREMENT}).getColumns();
                    Assert.assertFalse(columns.isEmpty());
                    Assert.assertEquals(5 * 4950, Bytes.toLong((byte[]) columns.get(TableConcurrentTest.COLUMN_TO_INCREMENT)));
                }
            });
            getTableAdmin(CONTEXT1, "myTable").drop();
        } catch (Throwable th) {
            getTableAdmin(CONTEXT1, "myTable").drop();
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r2v7, types: [byte[], byte[][]] */
    @Test(timeout = 20000)
    public void testConcurrentCreate() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        CreateThread createThread = new CreateThread(atomicBoolean);
        CreateThread createThread2 = new CreateThread(atomicBoolean2);
        createThread.start();
        createThread2.start();
        createThread.join();
        createThread2.join();
        Assert.assertTrue("First thread failed. ", atomicBoolean.get());
        Assert.assertTrue("Second thread failed. ", atomicBoolean2.get());
        getTable(CONTEXT1, "conccreate").get(new byte[]{97}, (byte[][]) new byte[]{new byte[]{98}});
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [byte[], byte[][]] */
    static {
        ROWS_TO_APPEND_TO[0] = ROW_TO_INCREMENT;
        for (int i = 1; i < ROWS_TO_APPEND_TO.length; i++) {
            ROWS_TO_APPEND_TO[i] = Bytes.toBytes("row_to_append_to_" + i);
        }
    }
}
