/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BrokerServiceThrottlingTest
extends BrokerTestBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testThrottlingLookupRequestSemaphore() throws Exception {
        BrokerService service = this.pulsar.getBrokerService();
        Assert.assertNotEquals((Object)((Semaphore)service.lookupRequestSemaphore.get()).availablePermits(), (Object)0);
        this.admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0));
        Thread.sleep(1000L);
        Assert.assertEquals((int)((Semaphore)service.lookupRequestSemaphore.get()).availablePermits(), (int)0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLookupThrottlingForClientByBroker0Permit() throws Exception {
        String topicName = "persistent://prop/ns-abc/newTopic";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub").subscribe();
            consumer.close();
            int newPermits = 0;
            this.admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits));
            for (int i = 0; i < 5 && this.pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits; ++i) {
                Thread.sleep(100 + i * 10);
            }
            try {
                consumer = pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/newTopic"}).subscriptionName("mysub").subscribe();
                consumer.close();
                Assert.fail((String)"It should fail as throttling should not receive any request");
            }
            catch (PulsarClientException.TooManyRequestsException tooManyRequestsException) {
                // empty catch block
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLookupThrottlingForClientByBroker() throws Exception {
        String topicName = "persistent://prop/ns-abc/newTopic";
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).ioThreads(20).connectionsPerBroker(20).build();
        try {
            int newPermits = 1;
            this.admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits));
            for (int i = 0; i < 5 && this.pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits; ++i) {
                Thread.sleep(100 + i * 10);
            }
            PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
            resolver.updateServiceUrl(this.pulsar.getBrokerServiceUrl());
            ClientConfigurationData conf = new ClientConfigurationData();
            conf.setConnectionsPerBroker(20);
            EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup((int)20, (boolean)false, (ThreadFactory)new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
            ExecutorService executor = Executors.newFixedThreadPool(10);
            try (ConnectionPool pool = new ConnectionPool(conf, eventLoop);){
                Throwable rootCause;
                int totalConsumers = 20;
                ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
                for (int i = 0; i < 20; ++i) {
                    long reqId = -559038737 + i;
                    Future<Object> f = executor.submit(() -> {
                        ByteBuf request = Commands.newPartitionMetadataRequest((String)"persistent://prop/ns-abc/newTopic", (long)reqId);
                        ((CompletableFuture)pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request, reqId))).get();
                        return null;
                    });
                    futures.add(f);
                }
                int rejects = 0;
                for (Future future : futures) {
                    try {
                        future.get();
                    }
                    catch (ExecutionException e) {
                        rootCause = e;
                        while (rootCause instanceof ExecutionException) {
                            rootCause = rootCause.getCause();
                        }
                        if (rootCause instanceof PulsarClientException.TooManyRequestsException) {
                            ++rejects;
                            continue;
                        }
                        throw e;
                    }
                }
                Assert.assertTrue((rejects > 0 ? 1 : 0) != 0);
                futures.clear();
                for (int i = 0; i < 20; ++i) {
                    long l = -559022353 + i;
                    Future<Object> f = executor.submit(() -> {
                        ByteBuf request = Commands.newLookup((String)"persistent://prop/ns-abc/newTopic", (boolean)true, (long)reqId);
                        ((CompletableFuture)pool.getConnection(resolver.resolveHost()).thenCompose(clientCnx -> clientCnx.newLookup(request, reqId))).get();
                        return null;
                    });
                    futures.add(f);
                }
                rejects = 0;
                for (Future future : futures) {
                    try {
                        future.get();
                    }
                    catch (ExecutionException e) {
                        rootCause = e;
                        while (rootCause instanceof ExecutionException) {
                            rootCause = rootCause.getCause();
                        }
                        if (rootCause instanceof PulsarClientException.TooManyRequestsException) {
                            ++rejects;
                            continue;
                        }
                        throw e;
                    }
                }
                Assert.assertTrue((rejects > 0 ? 1 : 0) != 0);
            }
            finally {
                executor.shutdownNow();
                eventLoop.shutdownNow();
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception {
        String topicName = "persistent://prop/ns-abc/newTopic-" + UUID.randomUUID().toString();
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).ioThreads(20).connectionsPerBroker(20).build();
        try {
            this.upsertLookupPermits(100);
            List<Consumer> consumers = Collections.synchronizedList(Lists.newArrayList());
            ExecutorService executor = Executors.newFixedThreadPool(10);
            try {
                int totalConsumers = 8;
                CountDownLatch latch = new CountDownLatch(8);
                for (int i = 0; i < 8; ++i) {
                    executor.execute(() -> {
                        try {
                            consumers.add(pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("mysub").subscriptionType(SubscriptionType.Shared).subscribe());
                        }
                        catch (PulsarClientException.TooManyRequestsException tooManyRequestsException) {
                        }
                        catch (Exception e) {
                            Assert.fail((String)"it shouldn't failed");
                        }
                        latch.countDown();
                    });
                }
                latch.await();
                this.admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", "1");
                this.admin.topics().unload(topicName);
                BrokerServiceThrottlingTest.retryStrategically(test -> this.areAllConsumersConnected(consumers), 5, 500L);
                int totalConnectedConsumers = 0;
                for (Consumer consumer : consumers) {
                    if (consumer.isConnected()) {
                        ++totalConnectedConsumers;
                    }
                    consumer.close();
                }
                Assert.assertEquals((int)totalConnectedConsumers, (int)8);
            }
            finally {
                if (Collections.singletonList(executor).get(0) != null) {
                    executor.shutdownNow();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    private boolean areAllConsumersConnected(List<Consumer<byte[]>> consumers) {
        for (Consumer<byte[]> consumer : consumers) {
            if (consumer.isConnected()) continue;
            return false;
        }
        return true;
    }

    private void upsertLookupPermits(int permits) throws Exception {
        this.pulsar.getPulsarResources().getDynamicConfigResources().setDynamicConfigurationWithCreate(optMap -> {
            Map map = optMap.orElse(new TreeMap());
            map.put("maxConcurrentLookupRequest", Integer.toString(permits));
            return map;
        });
    }
}

