/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.delayed;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Clock;
import java.util.Collections;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class InMemoryDeliveryTrackerTest {
    private final Timer timer = new HashedWheelTimer((ThreadFactory)new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), 500L, TimeUnit.MILLISECONDS);

    @AfterClass(alwaysRun=true)
    public void cleanup() {
        this.timer.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void test() throws Exception {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 1L, clock, false);
        try {
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
            Assert.assertTrue((boolean)tracker.addMessage(2L, 2L, 20L));
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 10L));
            Assert.assertTrue((boolean)tracker.addMessage(3L, 3L, 30L));
            Assert.assertTrue((boolean)tracker.addMessage(5L, 5L, 50L));
            Assert.assertTrue((boolean)tracker.addMessage(4L, 4L, 40L));
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)5L);
            Assert.assertEquals((Set)tracker.getScheduledMessages(10), Collections.emptySet());
            clockTime.set(15L);
            Assert.assertFalse((boolean)tracker.addMessage(6L, 6L, 10L));
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)5L);
            Assert.assertTrue((boolean)tracker.hasMessageAvailable());
            Set scheduled = tracker.getScheduledMessages(10);
            Assert.assertEquals((int)scheduled.size(), (int)1);
            clockTime.set(60L);
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)4L);
            Assert.assertTrue((boolean)tracker.hasMessageAvailable());
            scheduled = tracker.getScheduledMessages(1);
            Assert.assertEquals((int)scheduled.size(), (int)1);
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)3L);
            Assert.assertTrue((boolean)tracker.hasMessageAvailable());
            scheduled = tracker.getScheduledMessages(3);
            Assert.assertEquals((int)scheduled.size(), (int)3);
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)0L);
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
            Assert.assertEquals((Set)tracker.getScheduledMessages(10), Collections.emptySet());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithTimer() throws Exception {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        Timer timer = (Timer)Mockito.mock(Timer.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        TreeMap tasks = new TreeMap();
        Mockito.when((Object)timer.newTimeout((TimerTask)Mockito.any(), Mockito.anyLong(), (TimeUnit)((Object)Mockito.any()))).then(invocation -> {
            TimerTask task = (TimerTask)invocation.getArgument(0, TimerTask.class);
            long timeout = (Long)invocation.getArgument(1, Long.class);
            TimeUnit unit = (TimeUnit)((Object)((Object)invocation.getArgument(2, TimeUnit.class)));
            long scheduleAt = clockTime.get() + unit.toMillis(timeout);
            tasks.put(scheduleAt, task);
            Timeout t = (Timeout)Mockito.mock(Timeout.class);
            Mockito.when((Object)t.cancel()).then(i -> {
                tasks.remove(scheduleAt, task);
                return null;
            });
            return t;
        });
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1L, clock, false);
        try {
            Assert.assertTrue((boolean)tasks.isEmpty());
            Assert.assertTrue((boolean)tracker.addMessage(2L, 2L, 20L));
            Assert.assertEquals((int)tasks.size(), (int)1);
            Assert.assertEquals((long)((Long)tasks.firstKey()), (long)20L);
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 10L));
            Assert.assertEquals((int)tasks.size(), (int)1);
            Assert.assertEquals((long)((Long)tasks.firstKey()), (long)10L);
            Assert.assertTrue((boolean)tracker.addMessage(3L, 3L, 30L));
            Assert.assertEquals((int)tasks.size(), (int)1);
            Assert.assertEquals((long)((Long)tasks.firstKey()), (long)10L);
            clockTime.set(15L);
            TimerTask task = (TimerTask)tasks.pollFirstEntry().getValue();
            Timeout cancelledTimeout = (Timeout)Mockito.mock(Timeout.class);
            Mockito.when((Object)cancelledTimeout.isCancelled()).thenReturn((Object)true);
            task.run(cancelledTimeout);
            Mockito.verifyZeroInteractions((Object[])new Object[]{dispatcher});
            task.run((Timeout)Mockito.mock(Timeout.class));
            ((PersistentDispatcherMultipleConsumers)Mockito.verify((Object)dispatcher)).readMoreEntries();
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAddWithinTickTime() {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 100L, clock, false);
        try {
            clockTime.set(0L);
            Assert.assertFalse((boolean)tracker.addMessage(1L, 1L, 10L));
            Assert.assertFalse((boolean)tracker.addMessage(2L, 2L, 99L));
            Assert.assertFalse((boolean)tracker.addMessage(3L, 3L, 100L));
            Assert.assertTrue((boolean)tracker.addMessage(4L, 4L, 101L));
            Assert.assertTrue((boolean)tracker.addMessage(5L, 5L, 200L));
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)2L);
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAddMessageWithStrictDelay() {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 100L, clock, true);
        try {
            clockTime.set(10L);
            Assert.assertFalse((boolean)tracker.addMessage(1L, 1L, 9L));
            Assert.assertFalse((boolean)tracker.addMessage(4L, 4L, 10L));
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 11L));
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)1L);
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() throws Exception {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 1000L, clock, true);
        try {
            clockTime.set(10000L);
            Timeout timeout = (Timeout)Mockito.mock(Timeout.class);
            Mockito.when((Object)timeout.isCancelled()).then(x -> false);
            tracker.run(timeout);
            ((PersistentDispatcherMultipleConsumers)Mockito.verify((Object)dispatcher, (VerificationMode)Mockito.times((int)1))).readMoreEntries();
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 10001L));
            Thread.sleep(600L);
            ((PersistentDispatcherMultipleConsumers)Mockito.verify((Object)dispatcher, (VerificationMode)Mockito.times((int)1))).readMoreEntries();
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> ((PersistentDispatcherMultipleConsumers)Mockito.verify((Object)dispatcher)).readMoreEntries());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 100000L, clock, true);
        try {
            clockTime.set(500000L);
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 500005L));
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> ((PersistentDispatcherMultipleConsumers)Mockito.verify((Object)dispatcher)).readMoreEntries());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws Exception {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 500L, clock, true);
        try {
            clockTime.set(0L);
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 2000L));
            Thread.sleep(1000L);
            Mockito.verifyNoInteractions((Object[])new Object[]{dispatcher});
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> ((PersistentDispatcherMultipleConsumers)Mockito.verify((Object)dispatcher)).readMoreEntries());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithFixedDelays() throws Exception {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 1L, clock, true);
        try {
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 10L));
            Assert.assertTrue((boolean)tracker.addMessage(2L, 2L, 20L));
            Assert.assertTrue((boolean)tracker.addMessage(3L, 3L, 30L));
            Assert.assertTrue((boolean)tracker.addMessage(4L, 4L, 40L));
            Assert.assertTrue((boolean)tracker.addMessage(5L, 5L, 50L));
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
            Assert.assertEquals((long)tracker.getNumberOfDelayedMessages(), (long)5L);
            Assert.assertFalse((boolean)tracker.shouldPauseAllDeliveries());
            int i = 6;
            while ((long)i <= 50000L) {
                Assert.assertTrue((boolean)tracker.addMessage((long)i, (long)i, (long)(i * 10)));
                ++i;
            }
            Assert.assertTrue((boolean)tracker.shouldPauseAllDeliveries());
            clockTime.set(500000L);
            tracker.getScheduledMessages(100);
            Assert.assertFalse((boolean)tracker.shouldPauseAllDeliveries());
            int removed = 0;
            while ((removed = tracker.getScheduledMessages(100).size()) > 0) {
            }
            Assert.assertFalse((boolean)tracker.shouldPauseAllDeliveries());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithMixedDelays() throws Exception {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 1L, clock, true);
        try {
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 10L));
            Assert.assertTrue((boolean)tracker.addMessage(2L, 2L, 20L));
            Assert.assertTrue((boolean)tracker.addMessage(3L, 3L, 30L));
            Assert.assertTrue((boolean)tracker.addMessage(4L, 4L, 40L));
            Assert.assertTrue((boolean)tracker.addMessage(5L, 5L, 50L));
            Assert.assertFalse((boolean)tracker.shouldPauseAllDeliveries());
            int i = 6;
            while ((long)i <= 50000L) {
                Assert.assertTrue((boolean)tracker.addMessage((long)i, (long)i, (long)(i * 10)));
                ++i;
            }
            Assert.assertTrue((boolean)tracker.shouldPauseAllDeliveries());
            Assert.assertTrue((boolean)tracker.addMessage(5L, 5L, 5L));
            Assert.assertFalse((boolean)tracker.shouldPauseAllDeliveries());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithNoDelays() throws Exception {
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)Mockito.mock(PersistentDispatcherMultipleConsumers.class);
        AtomicLong clockTime = new AtomicLong();
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).then(x -> clockTime.get());
        InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, this.timer, 1L, clock, true);
        try {
            Assert.assertFalse((boolean)tracker.hasMessageAvailable());
            Assert.assertTrue((boolean)tracker.addMessage(1L, 1L, 10L));
            Assert.assertTrue((boolean)tracker.addMessage(2L, 2L, 20L));
            Assert.assertTrue((boolean)tracker.addMessage(3L, 3L, 30L));
            Assert.assertTrue((boolean)tracker.addMessage(4L, 4L, 40L));
            Assert.assertTrue((boolean)tracker.addMessage(5L, 5L, 50L));
            Assert.assertFalse((boolean)tracker.shouldPauseAllDeliveries());
            int i = 6;
            while ((long)i <= 50000L) {
                Assert.assertTrue((boolean)tracker.addMessage((long)i, (long)i, (long)(i * 10)));
                ++i;
            }
            Assert.assertTrue((boolean)tracker.shouldPauseAllDeliveries());
            Assert.assertFalse((boolean)tracker.addMessage(5L, 5L, -1L));
            Assert.assertFalse((boolean)tracker.shouldPauseAllDeliveries());
        }
        finally {
            if (Collections.singletonList(tracker).get(0) != null) {
                tracker.close();
            }
        }
    }
}

