/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class RetryTopicTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(RetryTopicTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/retry-topic";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
            for (int i = 0; i < 100; ++i) {
                producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
            }
            producer.close();
            int totalReceived = 0;
            do {
                Message message = consumer.receive();
                log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                consumer.reconsumeLater(message, 1L, TimeUnit.SECONDS);
            } while (++totalReceived < 300);
            int totalInDeadLetter = 0;
            do {
                Message message = deadLetterConsumer.receive();
                log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                deadLetterConsumer.acknowledge(message);
            } while (++totalInDeadLetter < 100);
            deadLetterConsumer.close();
            consumer.close();
            Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
            if (checkMessage != null) {
                log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
            }
            Assert.assertNull((Object)checkMessage);
            checkConsumer.close();
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryTopicProperties() throws Exception {
        String topic = "persistent://my-property/my-ns/retry-topic";
        int maxRedeliveryCount = 3;
        int sendMessages = 10;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
            HashSet originMessageIds = Sets.newHashSet();
            for (int i = 0; i < 10; ++i) {
                MessageId msgId = producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
                originMessageIds.add(msgId.toString());
            }
            producer.close();
            int totalReceived = 0;
            HashSet retryMessageIds = Sets.newHashSet();
            do {
                Message message = consumer.receive();
                log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                if (message.hasProperty("RECONSUMETIMES")) {
                    Assert.assertEquals((String)message.getProperty("REAL_TOPIC"), (String)"persistent://my-property/my-ns/retry-topic");
                    retryMessageIds.add(message.getProperty("ORIGIN_MESSAGE_IDY_TIME"));
                }
                consumer.reconsumeLater(message, 1L, TimeUnit.SECONDS);
            } while (++totalReceived < 40);
            Assert.assertEquals((Set)retryMessageIds, (Set)originMessageIds);
            int totalInDeadLetter = 0;
            HashSet deadLetterMessageIds = Sets.newHashSet();
            do {
                Message message = deadLetterConsumer.receive();
                log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                if (message.hasProperty("RECONSUMETIMES")) {
                    Assert.assertEquals((String)message.getProperty("REAL_TOPIC"), (String)"persistent://my-property/my-ns/retry-topic");
                    deadLetterMessageIds.add(message.getProperty("ORIGIN_MESSAGE_IDY_TIME"));
                }
                deadLetterConsumer.acknowledge(message);
            } while (++totalInDeadLetter < 10);
            Assert.assertEquals((Set)deadLetterMessageIds, (Set)originMessageIds);
            deadLetterConsumer.close();
            consumer.close();
            Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
            if (checkMessage != null) {
                log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
            }
            Assert.assertNull((Object)checkMessage);
            checkConsumer.close();
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @Test
    public void testRetryTopicNameForCompatibility() throws Exception {
        String topic = "persistent://my-property/my-ns/retry-topic";
        String oldRetryTopic = "persistent://my-property/my-ns/my-subscription-RETRY";
        String oldDeadLetterTopic = "persistent://my-property/my-ns/my-subscription-DLQ";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-subscription-RETRY", 2);
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-subscription-DLQ", 2);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
        for (int i = 0; i < 100; ++i) {
            producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
        }
        producer.close();
        int totalReceived = 0;
        do {
            Message message = consumer.receive();
            log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            consumer.reconsumeLater(message, 1L, TimeUnit.SECONDS);
        } while (++totalReceived < 300);
        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
        } while (++totalInDeadLetter < 100);
        deadLetterConsumer.close();
        consumer.close();
        Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
        }
        Assert.assertNull((Object)checkMessage);
        checkConsumer.close();
        newPulsarClient.close();
    }

    @Test
    public void testRetryTopicWithMultiTopic() throws Exception {
        String topic1 = "persistent://my-property/my-ns/retry-topic-1";
        String topic2 = "persistent://my-property/my-ns/retry-topic-2";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-1", "persistent://my-property/my-ns/retry-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer deadLetterConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-1-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer1 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic-1").create();
        Producer producer2 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic-2").create();
        for (int i = 0; i < sendMessages; ++i) {
            producer1.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
            producer2.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
        }
        sendMessages *= 2;
        producer1.close();
        producer2.close();
        int totalReceived = 0;
        do {
            Message message = consumer.receive();
            log.info("consumer received message : {} {} - total = {}", new Object[]{message.getMessageId(), new String(message.getData()), ++totalReceived});
        } while (totalReceived < sendMessages * 3);
        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
        } while (++totalInDeadLetter < sendMessages);
        deadLetterConsumer.close();
        consumer.close();
        Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-1", "persistent://my-property/my-ns/retry-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
        }
        Assert.assertNull((Object)checkMessage);
        checkConsumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryTopicByCustomTopicName() throws Exception {
        String topic = "persistent://my-property/my-ns/retry-topic";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscribe();
            Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
            for (int i = 0; i < 100; ++i) {
                producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
            }
            producer.close();
            int totalReceived = 0;
            do {
                Message message = consumer.receive();
                log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                consumer.reconsumeLater(message, 1L, TimeUnit.SECONDS);
            } while (++totalReceived < 300);
            int totalInDeadLetter = 0;
            do {
                Message message = deadLetterConsumer.receive();
                log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                deadLetterConsumer.acknowledge(message);
            } while (++totalInDeadLetter < 100);
            deadLetterConsumer.close();
            consumer.close();
            PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
                if (checkMessage != null) {
                    log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
                }
                Assert.assertNull((Object)checkMessage);
                checkConsumer.close();
            }
            finally {
                if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                    newPulsarClient1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @Test(timeOut=30000L)
    public void testRetryTopicException() throws Exception {
        String topic = "persistent://my-property/my-ns/retry-topic";
        int maxRedeliveryCount = 2;
        boolean sendMessages = true;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
        for (int i = 0; i < 1; ++i) {
            producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
        }
        producer.close();
        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl)consumer;
        List consumers = multiTopicsConsumer.getConsumers();
        for (ConsumerImpl c : consumers) {
            Set deadLetterPolicyField = ReflectionUtils.getAllFields(c.getClass(), (Predicate[])new Predicate[]{ReflectionUtils.withName((String)"deadLetterPolicy")});
            if (deadLetterPolicyField.size() == 0) continue;
            Field field = (Field)deadLetterPolicyField.iterator().next();
            field.setAccessible(true);
            DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy)field.get(c);
            deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
        }
        Message message = consumer.receive();
        log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
        try {
            consumer.reconsumeLater(message, 1L, TimeUnit.SECONDS);
        }
        catch (PulsarClientException.InvalidTopicNameException e) {
            Assert.assertEquals(((Object)((Object)e)).getClass(), PulsarClientException.InvalidTopicNameException.class);
        }
        catch (Exception e) {
            Assert.fail((String)"exception should be PulsarClientException.InvalidTopicNameException");
        }
        consumer.close();
    }
}

