/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.messaging;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class DelayMessagingTest
extends PulsarTestSuite {
    private static final Logger log = LoggerFactory.getLogger(DelayMessagingTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ServiceUrls")
    public void delayMsgBlockTest(Supplier<String> serviceUrl) throws Exception {
        String nsName = DelayMessagingTest.generateNamespaceName();
        this.pulsarCluster.createNamespace(nsName);
        String topic = DelayMessagingTest.generateTopicName(nsName, "testDelayMsgBlock", true);
        this.pulsarCluster.createPartitionedTopic(topic, 3);
        String retryTopic = topic + "-RETRY";
        String deadLetterTopic = topic + "-DLT";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
        try {
            Producer producer = pulsarClient.newProducer().topic(topic).create();
            try {
                int redeliverCnt = 10;
                int delayTimeSeconds = 5;
                Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableRetry(true).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).retryLetterTopic(retryTopic).deadLetterTopic(deadLetterTopic).build()).receiverQueueSize(100).ackTimeout(60L, TimeUnit.SECONDS).subscribe();
                try {
                    producer.newMessage().value((Object)"hello".getBytes()).send();
                    Message message = consumer.receive(10, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)message, (String)"Can't receive message at the first time.");
                    consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
                    for (int i = 0; i < 10; ++i) {
                        message = consumer.receive(10, TimeUnit.SECONDS);
                        Assert.assertNotNull((Object)message, (String)"Consumer can't receive message in double delayTimeSeconds time 10s");
                        log.info("receive msg. reConsumeTimes: {}", (Object)message.getProperty("RECONSUMETIMES"));
                        consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
                    }
                    Consumer dltConsumer = pulsarClient.newConsumer().topic(new String[]{deadLetterTopic}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("test").subscribe();
                    try {
                        message = dltConsumer.receive(10, TimeUnit.SECONDS);
                        Assert.assertNotNull((Object)message, (String)"Dead letter topic consumer can't receive message.");
                    }
                    finally {
                        if (Collections.singletonList(dltConsumer).get(0) != null) {
                            dltConsumer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }
}

