package cz.o2.proxima.direct.pubsub;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.io.pubsub.util.PubSubUtils;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.Publisher;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.PubsubMessage;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeDescriptorImpl;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.TestUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubWriterTest.class */
public class PubSubWriterTest {
    private static final Logger log = LoggerFactory.getLogger(PubSubWriterTest.class);
    private final PubSubAccessor accessor;
    private TestPubSubWriter writer;
    private final Repository repo = Repository.of(ConfigFactory.load().resolve());
    private final DirectDataOperator direct = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[]{directDataOperator -> {
        directDataOperator.withExecutorFactory(() -> {
            return Executors.newFixedThreadPool(5, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName(PubSubWriterTest.class.getSimpleName());
                thread.setDaemon(true);
                thread.setUncaughtExceptionHandler((thread2, th) -> {
                    log.error("Error in thread {}", thread2.getName(), th);
                });
                return thread;
            });
        });
    }});
    private final Context context = this.direct.getContext();
    private final PubSubStorage storage = new PubSubStorage();
    private final AttributeDescriptorImpl<?> attr = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("attr").setSchemeUri(new URI("bytes:///")).build();
    private final AttributeDescriptorImpl<?> wildcard = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("wildcard.*").setSchemeUri(new URI("bytes:///")).build();
    private final EntityDescriptor entity = EntityDescriptor.newBuilder().setName("entity").addAttribute(this.attr).addAttribute(this.wildcard).build();

    /* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubWriterTest$TestPubSubWriter.class */
    private class TestPubSubWriter extends PubSubWriter {
        private java.util.function.Consumer<PubsubMessage> consumer;

        public TestPubSubWriter(Context context) {
            super(PubSubWriterTest.this.accessor, context);
        }

        void setConsumer(java.util.function.Consumer<PubsubMessage> consumer) {
            this.consumer = consumer;
        }

        Publisher newPublisher(String str, String str2) throws IOException {
            return MockPublisher.create(str, str2, pubsubMessage -> {
                if (this.consumer != null) {
                    this.consumer.accept(pubsubMessage);
                }
            });
        }
    }

    public PubSubWriterTest() throws URISyntaxException {
        Assert.assertTrue(this.entity.findAttribute("attr").isPresent());
        this.accessor = new PubSubAccessor(this.storage, this.entity, new URI("gps://my-project/topic"), Collections.emptyMap());
    }

    @Before
    public void setUp() {
        this.writer = new TestPubSubWriter(this.context);
    }

    @Test(timeout = 10000)
    public void testWrite() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        TestPubSubWriter testPubSubWriter = this.writer;
        arrayList.getClass();
        testPubSubWriter.setConsumer((v1) -> {
            r1.add(v1);
        });
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.writer.write((StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), Util.update("key1", "attr", new byte[]{1, 2}, currentTimeMillis).getData().toByteArray())), (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        this.writer.write((StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), Util.delete("key2", "attr", currentTimeMillis + 1000).getData().toByteArray())), (z2, th2) -> {
            Assert.assertTrue(z2);
            countDownLatch.countDown();
        });
        this.writer.write((StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), Util.deleteWildcard("key3", this.wildcard, currentTimeMillis).getData().toByteArray())), (z3, th3) -> {
            Assert.assertTrue(z3);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals(3L, arrayList.size());
        StreamElement streamElement = (StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), ((PubsubMessage) arrayList.get(0)).getData().toByteArray()));
        Assert.assertEquals("key1", streamElement.getKey());
        Assert.assertEquals("attr", streamElement.getAttribute());
        Assert.assertFalse(streamElement.isDelete());
        Assert.assertFalse(streamElement.isDeleteWildcard());
        Assert.assertArrayEquals(new byte[]{1, 2}, streamElement.getValue());
        Assert.assertEquals(currentTimeMillis, streamElement.getStamp());
        StreamElement streamElement2 = (StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), ((PubsubMessage) arrayList.get(1)).getData().toByteArray()));
        Assert.assertEquals("key2", streamElement2.getKey());
        Assert.assertEquals("attr", streamElement2.getAttribute());
        Assert.assertTrue(streamElement2.isDelete());
        Assert.assertFalse(streamElement2.isDeleteWildcard());
        Assert.assertEquals(currentTimeMillis + 1000, streamElement2.getStamp());
        StreamElement streamElement3 = (StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), ((PubsubMessage) arrayList.get(2)).getData().toByteArray()));
        Assert.assertEquals("key3", streamElement3.getKey());
        Assert.assertEquals(this.wildcard.toAttributePrefix() + "*", streamElement3.getAttribute());
        Assert.assertTrue(streamElement3.isDelete());
        Assert.assertTrue(streamElement3.isDeleteWildcard());
        Assert.assertEquals(currentTimeMillis, streamElement3.getStamp());
    }

    @Test(timeout = 10000)
    public void testWriteFail() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        this.writer.setConsumer(pubsubMessage -> {
            throw new RuntimeException("Fail");
        });
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.writer.write((StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), Util.update("key1", "attr", new byte[]{1, 2}, currentTimeMillis).getData().toByteArray())), (z, th) -> {
            Assert.assertFalse(z);
            Assert.assertEquals("Fail", th.getMessage());
            countDownLatch.countDown();
        });
        this.writer.write((StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), Util.delete("key2", "attr", currentTimeMillis + 1000).getData().toByteArray())), (z2, th2) -> {
            Assert.assertFalse(z2);
            Assert.assertEquals("Fail", th2.getMessage());
            countDownLatch.countDown();
        });
        this.writer.write((StreamElement) Optionals.get(PubSubUtils.toStreamElement(this.entity, UUID.randomUUID().toString(), Util.deleteWildcard("key3", this.wildcard, currentTimeMillis).getData().toByteArray())), (z3, th3) -> {
            Assert.assertFalse(z3);
            Assert.assertEquals("Fail", th3.getMessage());
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    @Test
    public void testAsFactorySerializable() throws IOException, ClassNotFoundException {
        Assert.assertEquals(this.accessor.getUri(), ((AttributeWriterBase) ((AttributeWriterBase.Factory) TestUtils.deserializeObject(TestUtils.serializeObject(this.writer.asFactory()))).apply(this.repo)).getUri());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1310042067:
                if (implMethodName.equals("lambda$null$6947ff11$1")) {
                    z = true;
                    break;
                }
                break;
            case 2097764861:
                if (implMethodName.equals("lambda$new$60305140$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubWriterTest") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/core/DirectDataOperator;)V")) {
                    return directDataOperator -> {
                        directDataOperator.withExecutorFactory(() -> {
                            return Executors.newFixedThreadPool(5, runnable -> {
                                Thread thread = new Thread(runnable);
                                thread.setName(PubSubWriterTest.class.getSimpleName());
                                thread.setDaemon(true);
                                thread.setUncaughtExceptionHandler((thread2, th) -> {
                                    log.error("Error in thread {}", thread2.getName(), th);
                                });
                                return thread;
                            });
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubWriterTest") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    return () -> {
                        return Executors.newFixedThreadPool(5, runnable -> {
                            Thread thread = new Thread(runnable);
                            thread.setName(PubSubWriterTest.class.getSimpleName());
                            thread.setDaemon(true);
                            thread.setUncaughtExceptionHandler((thread2, th) -> {
                                log.error("Error in thread {}", thread2.getName(), th);
                            });
                            return thread;
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
