/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class LookupRetryTest
extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(LookupRetryTest.class);
    private static final String subscription = "reader-sub";
    private final AtomicInteger connectionsCreated = new AtomicInteger(0);
    private final ConcurrentHashMap<String, Queue<LookupError>> failureMap = new ConcurrentHashMap();

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("public", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("public/default", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.connectionsCreated.set(0);
    }

    @Override
    protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
        return new PulsarService(conf){

            protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
                BrokerService broker = new BrokerService((PulsarService)this, this.ioEventLoopGroup);
                broker.setPulsarChannelInitializerFactory((_pulsar, opts) -> new PulsarChannelInitializer(_pulsar, opts){

                    protected ServerCnx newServerCnx(PulsarService pulsar, String listenerName) throws Exception {
                        LookupRetryTest.this.connectionsCreated.incrementAndGet();
                        return new ErrorByTopicServerCnx(pulsar, LookupRetryTest.this.failureMap);
                    }
                });
                return broker;
            }
        };
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    PulsarClient newClient() throws Exception {
        return PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).connectionTimeout(2, TimeUnit.SECONDS).operationTimeout(1, TimeUnit.SECONDS).lookupTimeout(10, TimeUnit.SECONDS).build();
    }

    @Test
    public void testGetPartitionedMetadataRetries() throws Exception {
        try (PulsarClient client = this.newClient();){
            client.getPartitionsForTopic("TIMEOUT:2,OK:10").get();
        }
        client = this.newClient();
        var2_2 = null;
        try {
            client.getPartitionsForTopic("TOO_MANY:2,OK:10").get();
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var2_2 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
    }

    @Test
    public void testTimeoutRetriesOnPartitionMetadata() throws Exception {
        try (PulsarClient client = this.newClient();){
            Reader reader = client.newReader().topic("TIMEOUT:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            Throwable throwable = null;
            if (reader != null) {
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testTooManyRetriesOnPartitionMetadata() throws Exception {
        try (PulsarClient client = this.newClient();){
            Reader reader = client.newReader().topic("TOO_MANY:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            Throwable throwable = null;
            if (reader != null) {
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testTooManyOnLookup() throws Exception {
        try (PulsarClient client = this.newClient();){
            Reader reader = client.newReader().topic("OK:1,TOO_MANY:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            Throwable throwable = null;
            if (reader != null) {
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testTimeoutOnLookup() throws Exception {
        try (PulsarClient client = this.newClient();){
            Reader reader = client.newReader().topic("OK:1,TIMEOUT:2,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            Throwable throwable = null;
            if (reader != null) {
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testManyFailures() throws Exception {
        try (PulsarClient client = this.newClient();){
            Reader reader = client.newReader().topic("TOO_MANY:1,TIMEOUT:1,OK:1,TIMEOUT:1,TOO_MANY:1,OK:3").startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
            Throwable throwable = null;
            if (reader != null) {
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testProducerTimeoutOnPMR() throws Exception {
        try (PulsarClient client = this.newClient();){
            Producer producer = client.newProducer().topic("TIMEOUT:2,OK:3").create();
            Throwable throwable = null;
            if (producer != null) {
                if (throwable != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testProducerTooManyOnPMR() throws Exception {
        try (PulsarClient client = this.newClient();){
            Producer producer = client.newProducer().topic("TOO_MANY:2,OK:3").create();
            Throwable throwable = null;
            if (producer != null) {
                if (throwable != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testProducerTimeoutOnLookup() throws Exception {
        try (PulsarClient client = this.newClient();){
            Producer producer = client.newProducer().topic("OK:1,TIMEOUT:2,OK:3").create();
            Throwable throwable = null;
            if (producer != null) {
                if (throwable != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testProducerTooManyOnLookup() throws Exception {
        try (PulsarClient client = this.newClient();){
            Producer producer = client.newProducer().topic("OK:1,TOO_MANY:2,OK:3").create();
            Throwable throwable = null;
            if (producer != null) {
                if (throwable != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {
        String lookupUrl = this.pulsar.getBrokerServiceUrl();
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).maxNumberOfRejectedRequestPerConnection(1).build();){
            pulsarClient.newProducer().topic("TOO_MANY:2").create().close();
            Assert.assertEquals((int)this.connectionsCreated.get(), (int)2);
        }
        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).maxNumberOfRejectedRequestPerConnection(100).build();
        var3_3 = null;
        try {
            pulsarClient.newProducer().topic("TOO_MANY:2").create().close();
            pulsarClient.newProducer().topic("TOO_MANY:4").create().close();
            Assert.assertEquals((int)this.connectionsCreated.get(), (int)3);
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (pulsarClient != null) {
                if (var3_3 != null) {
                    try {
                        pulsarClient.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    pulsarClient.close();
                }
            }
        }
    }

    @Test
    public void testCloseConnectionOnBrokerTimeout() throws Exception {
        String lookupUrl = this.pulsar.getBrokerServiceUrl();
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).maxNumberOfRejectedRequestPerConnection(1).connectionTimeout(2, TimeUnit.SECONDS).operationTimeout(1, TimeUnit.SECONDS).lookupTimeout(10, TimeUnit.SECONDS).build();){
            pulsarClient.newProducer().topic("TIMEOUT:2").create().close();
            Assert.assertEquals((int)this.connectionsCreated.get(), (int)2);
        }
        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).maxNumberOfRejectedRequestPerConnection(100).maxNumberOfRejectedRequestPerConnection(1).connectionTimeout(2, TimeUnit.SECONDS).operationTimeout(1, TimeUnit.SECONDS).lookupTimeout(10, TimeUnit.SECONDS).build();
        var3_3 = null;
        try {
            pulsarClient.newProducer().topic("TIMEOUT:2").create().close();
            pulsarClient.newProducer().topic("TIMEOUT:2").create().close();
            Assert.assertEquals((int)this.connectionsCreated.get(), (int)3);
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (pulsarClient != null) {
                if (var3_3 != null) {
                    try {
                        pulsarClient.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    pulsarClient.close();
                }
            }
        }
    }

    private static class ErrorByTopicServerCnx
    extends ServerCnx {
        private final ConcurrentHashMap<String, Queue<LookupError>> failureMap;

        ErrorByTopicServerCnx(PulsarService pulsar, ConcurrentHashMap<String, Queue<LookupError>> failureMap) {
            super(pulsar);
            this.failureMap = failureMap;
        }

        private Queue<LookupError> errorList(String topicName) {
            return this.failureMap.compute(topicName, (k, v) -> {
                if (v == null) {
                    v = new ArrayBlockingQueue<LookupError>(100);
                    for (String e : k.split(",")) {
                        String[] parts = e.split(":");
                        LookupError error = Enum.valueOf(LookupError.class, parts[0]);
                        for (int i = 0; i < Integer.parseInt(parts[1]); ++i) {
                            v.add(error);
                        }
                    }
                }
                return v;
            });
        }

        protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
            TopicName t = TopicName.get((String)partitionMetadata.getTopic());
            LookupError error = this.errorList(t.getLocalName()).poll();
            if (error == LookupError.TOO_MANY) {
                long requestId = partitionMetadata.getRequestId();
                this.ctx.writeAndFlush((Object)Commands.newPartitionMetadataResponse((ServerError)ServerError.TooManyRequests, (String)"too many", (long)requestId));
            } else if (error != LookupError.TIMEOUT && (error == null || error == LookupError.OK)) {
                super.handlePartitionMetadataRequest(partitionMetadata);
            }
        }

        protected void handleLookup(CommandLookupTopic lookup) {
            TopicName t = TopicName.get((String)lookup.getTopic());
            LookupError error = this.errorList(t.getLocalName()).poll();
            if (error == LookupError.TOO_MANY) {
                long requestId = lookup.getRequestId();
                this.ctx.writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)ServerError.TooManyRequests, (String)"too many", (long)requestId));
            } else if (error != LookupError.TIMEOUT && (error == null || error == LookupError.OK)) {
                super.handleLookup(lookup);
            }
        }
    }

    static enum LookupError {
        UNKNOWN,
        TOO_MANY,
        TIMEOUT,
        OK;

    }
}

