package io.joynr.messaging.routing;

import io.joynr.messaging.persistence.MessagePersister;
import io.joynr.messaging.routing.MessageQueue;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/joynr/messaging/routing/MessageQueueTest.class */
public class MessageQueueTest {

    @Mock
    private DelayableImmutableMessage mockMessage;

    @Mock
    private DelayableImmutableMessage mockMessage2;

    @Mock
    private DelayableImmutableMessage mockMessage3;

    @Mock
    private MessageQueue.MaxTimeoutHolder maxTimeoutHolderMock;

    @Spy
    private DelayQueue<DelayableImmutableMessage> delayQueue = new DelayQueue<>();

    @Mock
    private MessagePersister messagePersisterMock;
    private String generatedMessageQueueId;
    private MessageQueue subject;

    @Before
    public void setup() {
        this.generatedMessageQueueId = UUID.randomUUID().toString();
        Mockito.when(Long.valueOf(this.maxTimeoutHolderMock.getTimeout())).thenReturn(50L);
        Mockito.when(this.messagePersisterMock.fetchAll((String) Matchers.eq(this.generatedMessageQueueId))).thenReturn((Set) Stream.of((Object[]) new DelayableImmutableMessage[]{this.mockMessage2, this.mockMessage3}).collect(Collectors.toSet()));
        this.subject = new MessageQueue(this.delayQueue, this.maxTimeoutHolderMock, this.generatedMessageQueueId, this.messagePersisterMock);
        drainQueue();
    }

    private void drainQueue() {
        try {
            this.subject.poll(1L, TimeUnit.SECONDS);
            this.subject.poll(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
    }

    @Test
    public void testPutAndRetrieveMessage() throws Exception {
        this.subject.put(this.mockMessage);
        DelayableImmutableMessage poll = this.subject.poll(0L, TimeUnit.MILLISECONDS);
        Assert.assertNotNull(poll);
        Assert.assertEquals(this.mockMessage, poll);
    }

    @Test
    public void testPollAndDelayedPut() throws Exception {
        HashSet hashSet = new HashSet();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                hashSet.add(this.subject.poll(1L, TimeUnit.SECONDS));
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                countDownLatch.countDown();
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        }).start();
        Thread.sleep(5L);
        this.subject.put(this.mockMessage);
        countDownLatch.await();
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(this.mockMessage, hashSet.iterator().next());
    }

    @Test
    public void testShutdownImmediatelyWithEmptyQueue() {
        long currentTimeMillis = System.currentTimeMillis();
        this.subject.waitForQueueToDrain();
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 5);
    }

    @Test
    public void testShutdownBlocksUntilQueueEmpty() {
        this.subject.put(this.mockMessage);
        new Thread(() -> {
            try {
                Thread.sleep(10L);
                this.subject.poll(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
        }).start();
        long currentTimeMillis = System.currentTimeMillis();
        this.subject.waitForQueueToDrain();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue("Expected call to take at least a second. Actual: " + currentTimeMillis2, currentTimeMillis2 >= 10);
        Assert.assertTrue("Expected call to not take more then fifty millis. Actual: " + currentTimeMillis2, currentTimeMillis2 < 50);
    }

    @Test
    public void testShutdownBlocksMaxTimeIfQueueNotEmptied() {
        this.subject.put(this.mockMessage);
        long currentTimeMillis = System.currentTimeMillis();
        this.subject.waitForQueueToDrain();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue("Expected stop to block for maximum of around 50ms. Actual: " + currentTimeMillis2, currentTimeMillis2 >= 50 && currentTimeMillis2 < 70);
    }

    @Test
    public void testMessagePersisterCalledWhenAddingMessage() {
        this.subject.put(this.mockMessage);
        ((MessagePersister) Mockito.verify(this.messagePersisterMock)).persist((String) Matchers.eq(this.generatedMessageQueueId), (DelayableImmutableMessage) Matchers.eq(this.mockMessage));
        ((DelayQueue) Mockito.verify(this.delayQueue)).put((DelayQueue) Matchers.eq(this.mockMessage));
    }

    @Test
    public void testMessagesFetchedFromPersistenceAndAddedToQueueOnStartup() {
        ((MessagePersister) Mockito.verify(this.messagePersisterMock)).fetchAll((String) Matchers.eq(this.generatedMessageQueueId));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(DelayableImmutableMessage.class);
        ((DelayQueue) Mockito.verify(this.delayQueue, Mockito.times(2))).put((DelayQueue) forClass.capture());
        List allValues = forClass.getAllValues();
        Assert.assertTrue(allValues.contains(this.mockMessage2));
        Assert.assertTrue(allValues.contains(this.mockMessage3));
    }
}
