package io.joynr.messaging.routing;

import io.joynr.messaging.routing.MessageQueue;
import io.joynr.util.ObjectMapper;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import joynr.system.RoutingTypes.MqttAddress;
import joynr.system.RoutingTypes.RoutingTypesUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/joynr/messaging/routing/MessageQueueTest.class */
public class MessageQueueTest {

    @Mock
    private DelayableImmutableMessage mockMessage;

    @Mock
    private MessageQueue.MaxTimeoutHolder maxTimeoutHolderMock;
    private MessageQueue subject;

    @Spy
    private DelayQueue<DelayableImmutableMessage> delayQueue = new DelayQueue<>();
    private final long shutdownMaxTimeout = 50;
    private final String sender = "fromParticipantId";
    private final String brokerUri = "testBrokerUri";
    private final String topic = "testTopic";
    private final MqttAddress replyToAddress = new MqttAddress("testBrokerUri", "testTopic");

    @Before
    public void setup() throws Exception {
        Mockito.when(Long.valueOf(this.maxTimeoutHolderMock.getTimeout())).thenReturn(50L);
        new ObjectMapper();
        Field declaredField = RoutingTypesUtil.class.getDeclaredField("objectMapper");
        declaredField.setAccessible(true);
        declaredField.set(RoutingTypesUtil.class, new ObjectMapper());
        this.subject = new MessageQueue(this.delayQueue, this.maxTimeoutHolderMock);
    }

    @Test
    public void testPutAndRetrieveMessage() throws Exception {
        this.subject.put(this.mockMessage);
        DelayableImmutableMessage poll = this.subject.poll(0L, TimeUnit.MILLISECONDS);
        Assert.assertNotNull(poll);
        Assert.assertEquals(this.mockMessage, poll);
    }

    @Test
    public void testPollAndDelayedPut() throws Exception {
        HashSet hashSet = new HashSet();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                hashSet.add(this.subject.poll(1L, TimeUnit.SECONDS));
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                countDownLatch.countDown();
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        }).start();
        Thread.sleep(5L);
        Assert.assertTrue("returned from poll before put", countDownLatch.getCount() > 0);
        this.subject.put(this.mockMessage);
        Assert.assertTrue("poll did not return within 1 second", countDownLatch.await(1L, TimeUnit.SECONDS));
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(this.mockMessage, hashSet.iterator().next());
    }

    @Test
    public void testShutdownImmediatelyWithEmptyQueue() {
        long currentTimeMillis = System.currentTimeMillis();
        this.subject.waitForQueueToDrain();
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 5);
    }

    @Test
    public void testShutdownBlocksUntilQueueEmpty() {
        this.subject.put(this.mockMessage);
        new Thread(() -> {
            try {
                Thread.sleep(10L);
                this.subject.poll(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
        }).start();
        long currentTimeMillis = System.currentTimeMillis();
        this.subject.waitForQueueToDrain();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue("Expected call to take at least a second. Actual: " + currentTimeMillis2, currentTimeMillis2 >= 10);
        Assert.assertTrue("Expected call to not take more then fifty millis. Actual: " + currentTimeMillis2, currentTimeMillis2 < 50);
    }

    @Test
    public void testShutdownBlocksMaxTimeIfQueueNotEmptied() {
        this.subject.put(this.mockMessage);
        long currentTimeMillis = System.currentTimeMillis();
        this.subject.waitForQueueToDrain();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue("Expected stop to block for maximum of around 50ms. Actual: " + currentTimeMillis2, currentTimeMillis2 >= 50 && currentTimeMillis2 < 70);
    }

    @Test
    public void testPollRemovesMessageFromDelayQueue() throws Exception {
        this.subject.put(this.mockMessage);
        TimeUnit timeUnit = TimeUnit.SECONDS;
        this.subject.poll(1L, timeUnit);
        ((DelayQueue) Mockito.verify(this.delayQueue, Mockito.times(1))).poll(1L, timeUnit);
    }
}
