/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.systopic;

import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PartitionedSystemTopicTest
extends BrokerTestBase {
    static final int PARTITIONS = 5;

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.resetConfig();
        this.conf.setAllowAutoTopicCreation(false);
        this.conf.setAllowAutoTopicCreationType("partitioned");
        this.conf.setDefaultNumPartitions(5);
        this.conf.setManagedLedgerMaxEntriesPerLedger(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testAutoCreatedPartitionedSystemTopic() throws Exception {
        String ns = "prop/ns-test";
        this.admin.namespaces().createNamespace("prop/ns-test", 2);
        NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(this.pulsarClient);
        TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory.createTopicPoliciesSystemTopicClient(NamespaceName.get((String)"prop/ns-test"));
        SystemTopicClient.Reader reader = systemTopicClientForNamespace.newReader();
        int partitions = this.admin.topics().getPartitionedTopicMetadata((String)String.format((String)"persistent://%s/%s", (Object[])new Object[]{"prop/ns-test", "__change_events"})).partitions;
        Assert.assertEquals((int)this.admin.topics().getPartitionedTopicList("prop/ns-test").size(), (int)1);
        Assert.assertEquals((int)partitions, (int)5);
        Assert.assertEquals((int)this.admin.topics().getList("prop/ns-test").size(), (int)5);
        reader.close();
    }

    @Test(timeOut=60000L)
    public void testConsumerCreationWhenEnablingTopicPolicy() throws Exception {
        String topic;
        String tenant = "tenant-" + RandomStringUtils.randomAlphabetic((int)4).toLowerCase();
        this.admin.tenants().createTenant(tenant, (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet(), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        int namespaceCount = 30;
        for (int i = 0; i < namespaceCount; ++i) {
            String ns = tenant + "/ns-" + i;
            this.admin.namespaces().createNamespace(ns, 4);
            topic = ns + "/t1";
            this.admin.topics().createPartitionedTopic(topic, 2);
        }
        ArrayList<CompletableFuture> futureList = new ArrayList<CompletableFuture>();
        for (int i = 0; i < namespaceCount; ++i) {
            topic = tenant + "/ns-" + i + "/t1";
            futureList.add(this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub").subscribeAsync());
        }
        FutureUtil.waitForAll(futureList).get();
        for (CompletableFuture consumer : futureList) {
            ((Consumer)consumer.join()).close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProduceAndConsumeUnderSystemNamespace() throws Exception {
        TenantInfo tenantInfo = TenantInfo.builder().adminRoles((Set)Sets.newHashSet((Object[])new String[]{"admin"})).allowedClusters((Set)Sets.newHashSet((Object[])new String[]{"test"})).build();
        this.admin.tenants().createTenant("pulsar", tenantInfo);
        this.admin.namespaces().createNamespace("pulsar/system", 2);
        Producer producer = this.pulsarClient.newProducer().topic("pulsar/system/__topic-1").create();
        try {
            producer.send((Object)"test".getBytes(StandardCharsets.UTF_8));
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"pulsar/system/__topic-1"}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).subscribe();
            try {
                Message receive = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)receive);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void testHealthCheckTopicNotOffload() throws Exception {
        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2((String)this.pulsar.getAdvertisedAddress(), (ServiceConfiguration)this.pulsar.getConfig());
        TopicName topicName = TopicName.get((String)"persistent", (NamespaceName)namespaceName, (String)"healthcheck");
        PersistentTopic persistentTopic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic(topicName.toString(), true).get()).get();
        ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
        config.setLedgerOffloader((LedgerOffloader)NullLedgerOffloader.INSTANCE);
        this.admin.brokers().healthcheck(TopicVersion.V2);
        this.admin.topics().triggerOffload(topicName.toString(), MessageId.earliest);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)persistentTopic.getManagedLedger().getOffloadedSize(), (long)0L));
        LedgerOffloader ledgerOffloader = (LedgerOffloader)Mockito.mock(LedgerOffloader.class);
        config.setLedgerOffloader(ledgerOffloader);
        Assert.assertEquals((Object)config.getLedgerOffloader(), (Object)ledgerOffloader);
        this.admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
        Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals((Object)persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(), (Object)NullLedgerOffloader.INSTANCE));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
        String ns = "prop/ns-test";
        String topic = "prop/ns-test/topic-1";
        this.admin.namespaces().createNamespace("prop/ns-test", 2);
        this.admin.topics().createPartitionedTopic(String.format("persistent://%s", "prop/ns-test/topic-1"), 1);
        BacklogQuota quota = BacklogQuota.builder().limitTime(2).limitSize(-1L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build();
        this.admin.namespaces().setBacklogQuota("prop/ns-test", quota, BacklogQuota.BacklogQuotaType.message_age);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("prop/ns-test/topic-1").create();
        try {
            String partition0 = TopicName.get((String)String.format("persistent://%s", "prop/ns-test/topic-1")).getPartition(0).toString();
            Optional topicReference = this.pulsar.getBrokerService().getTopicReference(partition0);
            Assert.assertTrue((boolean)topicReference.isPresent());
            PersistentTopic persistentTopic = (PersistentTopic)topicReference.get();
            ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
            config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
            config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
            persistentTopic.getManagedLedger().setConfig(config);
            Whitebox.invokeMethod((Object)persistentTopic.getManagedLedger(), (String)"updateLastLedgerCreatedTimeAndScheduleRolloverTask", (Object[])new Object[0]);
            String msg1 = "msg-1";
            producer.send((Object)msg1);
            Thread.sleep(3000L);
            Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"prop/ns-test/topic-1"}).subscriptionName("sub-1").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Key_Shared).subscribe();
            Message receive = consumer2.receive();
            consumer2.acknowledge(receive);
            Thread.sleep(3000L);
            try {
                Producer producerN = PulsarClient.builder().maxBackoffInterval(3L, TimeUnit.SECONDS).operationTimeout(5, TimeUnit.SECONDS).serviceUrl(this.lookupUrl.toString()).connectionTimeout(2, TimeUnit.SECONDS).build().newProducer(Schema.STRING).topic("prop/ns-test/topic-1").sendTimeout(3, TimeUnit.SECONDS).create();
                Assert.assertTrue((boolean)producerN.isConnected());
                producerN.close();
            }
            catch (Exception ex) {
                Assert.fail((String)"failed to create producer");
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

