package cz.o2.proxima.direct.hbase;

import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.batch.BatchLogReader;
import cz.o2.proxima.direct.batch.ObserveHandle;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.ConfigRepository;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.TestUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/hbase/HBaseLogReaderTest.class */
public class HBaseLogReaderTest {
    private static final TableName tableName = TableName.valueOf("test");
    private static final HBaseTestingUtility util = HBaseTestingUtility.createLocalHTU();
    private static MiniHBaseCluster cluster;
    private final Repository repo = ConfigRepository.Builder.ofTest(ConfigFactory.load()).build();
    private final EntityDescriptor entity = (EntityDescriptor) this.repo.findEntity("test").get();
    private final AttributeDescriptor<?> attr = (AttributeDescriptor) this.entity.findAttribute("dummy").get();
    private final AttributeDescriptor<?> wildcard = (AttributeDescriptor) this.entity.findAttribute("wildcard.*").get();
    private HBaseLogReader reader;
    private Connection conn;
    private Table client;

    @BeforeClass
    public static void beforeClass() throws Exception {
        cluster = util.startMiniCluster();
        cluster.waitForActiveAndReadyMaster(100000L);
    }

    @AfterClass
    public static void afterClass() throws IOException {
        cluster.shutdown();
        cluster.waitUntilShutDown();
    }

    /* JADX WARN: Type inference failed for: r3v1, types: [byte[], byte[][]] */
    @Before
    public void setUp() throws Exception {
        util.deleteTableIfAny(tableName);
        util.createTable(tableName, HbaseTestUtil.bytes("u"), (byte[][]) new byte[]{HbaseTestUtil.bytes("first"), HbaseTestUtil.bytes("second")});
        this.conn = ConnectionFactory.createConnection(util.getConfiguration());
        this.client = this.conn.getTable(tableName);
        this.reader = new HBaseLogReader(new URI("hbase://localhost:2181/test?family=u"), cluster.getConfiguration(), this.entity, () -> {
            return Executors.newCachedThreadPool();
        });
    }

    @After
    public void tearDown() throws IOException {
        this.client.close();
        this.conn.close();
    }

    @Test
    public void testGetPartitions() {
        List partitions = this.reader.getPartitions(-1L, 1L);
        Assert.assertEquals(partitions.toString(), 3L, partitions.size());
        partitions.forEach(hBasePartition -> {
            Assert.assertEquals(0L, hBasePartition.getStartStamp());
            Assert.assertEquals(1L, hBasePartition.getEndStamp());
        });
        Assert.assertEquals("", new String(((HBasePartition) partitions.get(0)).getStartKey()));
        Assert.assertEquals("first", new String(((HBasePartition) partitions.get(0)).getEndKey()));
        Assert.assertEquals("first", new String(((HBasePartition) partitions.get(1)).getStartKey()));
        Assert.assertEquals("second", new String(((HBasePartition) partitions.get(1)).getEndKey()));
        Assert.assertEquals("second", new String(((HBasePartition) partitions.get(2)).getStartKey()));
        Assert.assertEquals("", new String(((HBasePartition) partitions.get(2)).getEndKey()));
    }

    @Test(timeout = 30000)
    public void testObserve() throws InterruptedException, IOException {
        final long j = 1500000000000L;
        write("a", "dummy", "a", 1500000000000L);
        write("firs", "wildcard.1", "firs", 1500000000000L);
        write("fir", "dummy", "fir", 1500000000000L);
        write("first", "dummy", "first", 1500000000000L);
        List partitions = this.reader.getPartitions();
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.reader.observe(partitions.subList(0, 1), Lists.newArrayList(new AttributeDescriptor[]{this.attr}), new BatchLogObserver() { // from class: cz.o2.proxima.direct.hbase.HBaseLogReaderTest.1
            public boolean onNext(StreamElement streamElement) {
                Assert.assertEquals(j, streamElement.getStamp());
                arrayList.add(streamElement.getKey());
                return true;
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertEquals(Arrays.asList("a", "fir"), arrayList);
    }

    @Test(timeout = 5000)
    public void testObserveCancel() throws InterruptedException, IOException {
        write("a", "dummy", "a", 1500000000000L);
        write("firs", "wildcard.1", "firs", 1500000000000L);
        write("fir", "dummy", "fir", 1500000000000L);
        write("first", "dummy", "first", 1500000000000L);
        List partitions = this.reader.getPartitions();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(this.reader.observe(partitions.subList(0, 1), Collections.singletonList(this.attr), new BatchLogObserver() { // from class: cz.o2.proxima.direct.hbase.HBaseLogReaderTest.2
            public boolean onNext(StreamElement streamElement) {
                ((ObserveHandle) atomicReference.get()).close();
                return true;
            }

            public void onCancelled() {
                countDownLatch.countDown();
            }

            public void onCompleted() {
                Assert.fail("onCompleted should not heve been called");
            }
        }));
        countDownLatch.await();
    }

    @Test(timeout = 30000)
    public void testObserveLast() throws InterruptedException, IOException {
        final long j = 1500000000000L;
        write("secon", "dummy", "secon", 1500000000000L);
        write("second", "dummy", "second", 1500000000000L);
        write("third", "dummy", "third", 1500000000000L);
        List partitions = this.reader.getPartitions();
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.reader.observe(partitions.subList(2, 3), Lists.newArrayList(new AttributeDescriptor[]{this.attr}), new BatchLogObserver() { // from class: cz.o2.proxima.direct.hbase.HBaseLogReaderTest.3
            public boolean onNext(StreamElement streamElement) {
                Assert.assertEquals(j, streamElement.getStamp());
                arrayList.add(streamElement.getKey());
                return true;
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertEquals(Lists.newArrayList(new String[]{"second", "third"}), arrayList);
    }

    @Test(timeout = 30000)
    public void testObserveMultiple() throws IOException, InterruptedException {
        final long j = 1500000000000L;
        write("a", "dummy", "a", 1500000000000L);
        write("firs", "wildcard.1", "firs", 1500000000000L);
        write("fir", "dummy", "fir", 1500000000000L);
        write("first", "dummy", "first", 1500000000000L);
        List partitions = this.reader.getPartitions();
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Assert.assertEquals(3L, partitions.size());
        this.reader.observe(partitions.subList(0, 1), Lists.newArrayList(new AttributeDescriptor[]{this.attr, this.wildcard}), new BatchLogObserver() { // from class: cz.o2.proxima.direct.hbase.HBaseLogReaderTest.4
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onNext(StreamElement streamElement) {
                Assert.assertEquals(j, streamElement.getStamp());
                arrayList.add(streamElement.getKey());
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(Lists.newArrayList(new String[]{"a", "fir", "firs"}), arrayList);
    }

    @Test
    public void testAsFactorySerializable() throws IOException, ClassNotFoundException {
        Assert.assertEquals(this.reader.getUri(), ((HBaseLogReader) ((BatchLogReader.Factory) TestUtils.deserializeObject(TestUtils.serializeObject(this.reader.asFactory()))).apply(this.repo)).getUri());
    }

    private void write(String str, String str2, String str3, long j) throws IOException {
        HbaseTestUtil.write(str, str2, str3, j, this.client);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1065329471:
                if (implMethodName.equals("lambda$setUp$7fa3ec4c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/hbase/HBaseLogReaderTest") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    return () -> {
                        return Executors.newCachedThreadPool();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
