/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.pulsar.io.AbstractPulsarE2ETest;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker-io"})
public class PulsarFunctionE2ETest
extends AbstractPulsarE2ETest {
    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(tenant);
        functionConfig.setNamespace(namespace);
        functionConfig.setName(functionName);
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
        functionConfig.setSubName(subscriptionName);
        if (sourceTopic != null) {
            String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
            functionConfig.setTopicsPattern(sourceTopicPattern);
        }
        functionConfig.setAutoAck(Boolean.valueOf(true));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput(sinkTopic);
        functionConfig.setCleanupSubscription(Boolean.valueOf(true));
        return functionConfig;
    }

    private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String sinkTopic2 = "persistent://external-repl-prop/io/output2";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output2"}).subscriptionName("sub").subscribe();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        functionConfig.setParallelism(Integer.valueOf(2));
        functionConfig.setOutput("persistent://external-repl-prop/io/output2");
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
            Assert.assertEquals((int)topicStats.getPublishers().size(), (int)2);
            Assert.assertNotNull((Object)((PublisherStats)topicStats.getPublishers().get(0)).getMetadata());
            Assert.assertTrue((boolean)((PublisherStats)topicStats.getPublishers().get(0)).getMetadata().containsKey("id"));
            Assert.assertEquals((String)((String)((PublisherStats)topicStats.getPublishers().get(0)).getMetadata().get("id")), (String)String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)1));
        int totalMsgs = 5;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub");
            Assert.assertEquals((long)subStats.getUnackedMessages(), (long)0L);
        });
        Message msg = consumer.receive(5, TimeUnit.SECONDS);
        String receivedPropertyValue = msg.getProperty("key");
        Assert.assertEquals((String)"value", (String)receivedPropertyValue);
        Assert.assertNotEquals((Object)((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().values().iterator().next()).getUnackedMessages(), (Object)totalMsgs);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        Awaitility.await().ignoreExceptions().untilAsserted(() -> Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)0));
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    @Test(timeOut=20000L)
    public void testE2EPulsarFunctionWithFile() throws Exception {
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        this.testE2EPulsarFunction(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testE2EPulsarFunctionWithUrl() throws Exception {
        this.testE2EPulsarFunction(this.fileServer.getUrl("/pulsar-functions-api-examples.jar"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReadCompactedFunction() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String functionName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        int messageNum = 20;
        int maxKeys = 10;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://external-repl-prop/io/my-topic1"}).subscriptionName("test-sub").readCompacted(true).subscribe().close();
        HashMap<String, String> expected = new HashMap<String, String>();
        for (int j = 0; j < 20; ++j) {
            String key = "key" + j % 10;
            String value = "my-message-" + key + j;
            producer.newMessage().key(key).value((Object)value).send();
            expected.put(key, value);
        }
        ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
        try {
            Message message;
            TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(this.config, this.pulsarClient, this.pulsar.getBookKeeperClient(), compactionScheduler);
            twoPhaseCompactor.compact("persistent://external-repl-prop/io/my-topic1").get();
            FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", null, "persistent://external-repl-prop/io/output", "test-sub");
            HashMap<String, ConsumerConfig> inputSpecs = new HashMap<String, ConsumerConfig>();
            ConsumerConfig consumerConfig = new ConsumerConfig();
            HashMap<String, String> consumerProperties = new HashMap<String, String>();
            consumerProperties.put("readCompacted", "true");
            consumerConfig.setConsumerProperties(consumerProperties);
            inputSpecs.put("persistent://external-repl-prop/io/my-topic1", consumerConfig);
            functionConfig.setInputSpecs(inputSpecs);
            String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
            this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output"}).subscriptionName("sink-sub").subscribe();
            int count = 0;
            while ((message = consumer.receive(10, TimeUnit.SECONDS)) != null) {
                consumer.acknowledge(message);
                ++count;
                Assert.assertEquals((String)((String)expected.remove(message.getKey()) + "!"), (String)((String)message.getValue()));
            }
            Assert.assertEquals((int)count, (int)10);
            Assert.assertTrue((boolean)expected.isEmpty());
            consumer.close();
            producer.close();
        }
        finally {
            if (Collections.singletonList(compactionScheduler).get(0) != null) {
                compactionScheduler.shutdownNow();
            }
        }
    }

    @Test(timeOut=20000L)
    public void testPulsarFunctionStats() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)1);
        FunctionRuntimeManager functionRuntimeManager = this.functionsWorkerService.getFunctionRuntimeManager();
        FunctionStatsImpl functionStats = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", null);
        FunctionStatsImpl functionStatsFromAdmin = (FunctionStatsImpl)this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test");
        Assert.assertEquals((Object)functionStats, (Object)functionStatsFromAdmin);
        Assert.assertEquals((long)functionStats.getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getUserExceptionsTotal(), (long)0L);
        Assert.assertNull((Object)functionStats.avgProcessLatency);
        Assert.assertEquals((long)functionStats.oneMin.getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertNull((Object)functionStats.oneMin.getAvgProcessLatency());
        Assert.assertEquals((Object)functionStats.getAvgProcessLatency(), (Object)functionStats.oneMin.getAvgProcessLatency());
        Assert.assertNull((Object)functionStats.getLastInvocation());
        Assert.assertEquals((int)functionStats.instances.size(), (int)1);
        Assert.assertEquals((int)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), (long)0L);
        Assert.assertNull((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency());
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getReceivedTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getProcessedSuccessfullyTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getUserExceptionsTotal(), (long)0L);
        Assert.assertNull((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency());
        Assert.assertEquals((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency());
        Assert.assertEquals((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)functionStats.getAvgProcessLatency());
        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
        log.info("prometheus metrics: {}", (Object)prometheusMetrics);
        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_process_latency_ms");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)Double.NaN);
        m = metrics.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)Double.NaN);
        m = metrics.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        FunctionInstanceStatsDataImpl functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, null);
        FunctionInstanceStatsDataImpl functionInstanceStatsAdmin = (FunctionInstanceStatsDataImpl)this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0);
        Assert.assertEquals((Object)functionInstanceStats, (Object)functionInstanceStatsAdmin);
        Assert.assertEquals((Object)functionInstanceStats, (Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics());
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub");
                return subStats.getUnackedMessages() == 0L && subStats.getMsgThroughputOut() == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        functionStats = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", null);
        functionStatsFromAdmin = (FunctionStatsImpl)this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test");
        Assert.assertEquals((Object)functionStats, (Object)functionStatsFromAdmin);
        Assert.assertEquals((long)functionStats.getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((functionStats.avgProcessLatency > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)functionStats.oneMin.getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.oneMin.getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)functionStats.oneMin.getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)functionStats.oneMin.getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((functionStats.oneMin.getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)functionStats.getAvgProcessLatency(), (Object)functionStats.oneMin.getAvgProcessLatency());
        Assert.assertTrue((functionStats.getLastInvocation() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((int)functionStats.instances.size(), (int)1);
        Assert.assertEquals((int)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getReceivedTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getProcessedSuccessfullyTotal(), (long)totalMsgs);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getSystemExceptionsTotal(), (long)0L);
        Assert.assertEquals((long)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getUserExceptionsTotal(), (long)0L);
        Assert.assertTrue((((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency());
        Assert.assertEquals((Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), (Object)functionStats.getAvgProcessLatency());
        functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, null);
        functionInstanceStatsAdmin = (FunctionInstanceStatsDataImpl)this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0);
        Assert.assertEquals((Object)functionInstanceStats, (Object)functionInstanceStatsAdmin);
        Assert.assertEquals((Object)functionInstanceStats, (Object)((FunctionInstanceStatsImpl)functionStats.instances.get(0)).getMetrics());
        prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
        log.info("prometheus metrics: {}", (Object)prometheusMetrics);
        metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        m = metrics.get("pulsar_function_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_function_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_process_latency_ms");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_function_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)0);
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    @Test(timeOut=20000L)
    public void testPulsarFunctionStatus() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)1);
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub");
                return subStats.getUnackedMessages() == 0L && subStats.getMsgThroughputOut() == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarSink-test");
        int numInstances = functionStatus.getNumInstances();
        Assert.assertEquals((int)numInstances, (int)1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus();
        double count = status.getNumReceived();
        double success = status.getNumSuccessfullyProcessed();
        String ownerWorkerId = status.getWorkerId();
        Assert.assertEquals((int)((int)count), (int)totalMsgs);
        Assert.assertEquals((int)((int)success), (int)totalMsgs);
        Assert.assertEquals((String)ownerWorkerId, (String)this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)0);
    }

    @Test(dataProvider="validRoleName")
    public void testAuthorization(boolean validRoleName) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        String roleName = validRoleName ? "superUser" : "invalid";
        TenantInfo propAdmin = TenantInfo.builder().adminRoles(Collections.singleton(roleName)).allowedClusters(Collections.singleton("use")).build();
        this.admin.tenants().updateTenant("external-repl-prop", propAdmin);
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        if (!validRoleName) {
            this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(this.pulsar.getWebServiceAddressTls()).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).build());
            try {
                this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
            }
            catch (PulsarAdminException.NotAuthorizedException ne) {
                Assert.assertFalse((boolean)validRoleName);
            }
        } else {
            try {
                this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
                Assert.assertTrue((boolean)validRoleName);
            }
            catch (PulsarAdminException.NotAuthorizedException ne) {
                Assert.fail();
            }
        }
    }

    @Test(timeOut=20000L)
    public void testFunctionStopAndRestartApi() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopicName = "restartFunction";
        String sourceTopic = "persistent://external-repl-prop/io/restartFunction";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String functionName = "PulsarSink-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/restartFunction").create();
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig functionConfig = PulsarFunctionE2ETest.createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "restartFunction", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
                return subStats != null && subStats.getConsumers().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
        Assert.assertEquals((int)subStats.getConsumers().size(), (int)1);
        this.admin.functions().stopFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStat = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
                return subStat != null && subStat.getConsumers().size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
        Assert.assertEquals((int)subStats.getConsumers().size(), (int)0);
        this.admin.functions().restartFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStat = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
                return subStat != null && subStat.getConsumers().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
        Assert.assertEquals((int)subStats.getConsumers().size(), (int)1);
        producer.close();
    }

    @Test(timeOut=20000L)
    public void testFunctionAutomaticSubCleanup() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant("external-repl-prop");
        functionConfig.setNamespace("io");
        functionConfig.setName("PulsarFunction-test");
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setInputs(Collections.singleton("persistent://external-repl-prop/io/my-topic1"));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setOutput("persistent://external-repl-prop/io/output");
        functionConfig.setCleanupSubscription(Boolean.valueOf(false));
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                FunctionConfig configure = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                return configure != null && configure.getCleanupSubscription() != null;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertFalse((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)1);
        functionConfig.setCleanupSubscription(Boolean.valueOf(true));
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription();
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertTrue((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get(InstanceUtils.getDefaultSubscriptionName((String)"external-repl-prop", (String)"io", (String)"PulsarFunction-test"));
                return subStats.getUnackedMessages() == 0L && subStats.getMsgThroughputOut() == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarFunction-test");
        int numInstances = functionStatus.getNumInstances();
        Assert.assertEquals((int)numInstances, (int)1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus();
        double count = status.getNumReceived();
        double success = status.getNumSuccessfullyProcessed();
        String ownerWorkerId = status.getWorkerId();
        Assert.assertEquals((int)((int)count), (int)totalMsgs);
        Assert.assertEquals((int)((int)success), (int)totalMsgs);
        Assert.assertEquals((String)ownerWorkerId, (String)this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)0);
        functionConfig.setCleanupSubscription(Boolean.valueOf(false));
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)1);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                FunctionConfig result = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                return result.getCleanupSubscription() == false;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertFalse((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        functionConfig.setParallelism(Integer.valueOf(2));
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                FunctionConfig result = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertFalse((boolean)this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription());
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testMultiTopicFunction() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic1 = "persistent://external-repl-prop/io/my-topic1";
        String sourceTopic2 = "persistent://external-repl-prop/io/my-topic2";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer1 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        try {
            Producer producer2 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic2").create();
            try {
                String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
                FunctionConfig functionConfig = new FunctionConfig();
                functionConfig.setTenant("external-repl-prop");
                functionConfig.setNamespace("io");
                functionConfig.setName("PulsarFunction-test");
                functionConfig.setParallelism(Integer.valueOf(1));
                LinkedList<String> topics = new LinkedList<String>();
                topics.add("persistent://external-repl-prop/io/my-topic1");
                topics.add("persistent://external-repl-prop/io/my-topic2");
                functionConfig.setInputs(topics);
                functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
                functionConfig.setOutput("persistent://external-repl-prop/io/output");
                functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
                this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
                Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                    try {
                        this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                        return true;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic2").getSubscriptions().size() == 1;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                int totalMsgs = 10;
                for (int i = 0; i < totalMsgs; ++i) {
                    String data = "my-message-" + i;
                    producer1.newMessage().property("key", "value").value((Object)data).send();
                    producer2.newMessage().property("key", "value").value((Object)data).send();
                }
                Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                    try {
                        SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get(InstanceUtils.getDefaultSubscriptionName((String)"external-repl-prop", (String)"io", (String)"PulsarFunction-test"));
                        return subStats.getUnackedMessages() == 0L && subStats.getMsgThroughputOut() == (double)totalMsgs;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 5, 200L));
                Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                    try {
                        SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic2").getSubscriptions().get(InstanceUtils.getDefaultSubscriptionName((String)"external-repl-prop", (String)"io", (String)"PulsarFunction-test"));
                        return subStats.getUnackedMessages() == 0L && subStats.getMsgThroughputOut() == (double)totalMsgs;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 5, 200L));
                FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarFunction-test");
                int numInstances = functionStatus.getNumInstances();
                Assert.assertEquals((int)numInstances, (int)1);
                FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus();
                double count = status.getNumReceived();
                double success = status.getNumSuccessfullyProcessed();
                String ownerWorkerId = status.getWorkerId();
                Assert.assertEquals((int)((int)count), (int)(totalMsgs * 2));
                Assert.assertEquals((int)((int)success), (int)(totalMsgs * 2));
                Assert.assertEquals((String)ownerWorkerId, (String)this.workerId);
                this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
                Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic2").getSubscriptions().size() == 0;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
            }
            finally {
                if (Collections.singletonList(producer2).get(0) != null) {
                    producer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    @Test(timeOut=20000L)
    public void testE2EPulsarFunctionMessagePooled() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output"}).subscriptionName("sub").subscribe();
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant("external-repl-prop");
        functionConfig.setNamespace("io");
        functionConfig.setName("PulsarFunction-test");
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setSubName("test-sub");
        functionConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/my-topic1", ConsumerConfig.builder().poolMessages(true).build()));
        functionConfig.setAutoAck(Boolean.valueOf(true));
        functionConfig.setClassName(ByteBufferFunction.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput("persistent://external-repl-prop/io/output");
        functionConfig.setCleanupSubscription(Boolean.valueOf(true));
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        this.admin.functions().createFunctionWithUrl(functionConfig, PulsarFunctionE2ETest.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString());
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output");
            Assert.assertEquals((int)topicStats.getPublishers().size(), (int)1);
            Assert.assertNotNull((Object)((PublisherStats)topicStats.getPublishers().get(0)).getMetadata());
            Assert.assertTrue((boolean)((PublisherStats)topicStats.getPublishers().get(0)).getMetadata().containsKey("id"));
            Assert.assertEquals((String)((String)((PublisherStats)topicStats.getPublishers().get(0)).getMetadata().get("id")), (String)String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)1));
        int totalMsgs = 5;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub");
            Assert.assertEquals((long)subStats.getUnackedMessages(), (long)0L);
        });
        Message msg = consumer.receive(5, TimeUnit.SECONDS);
        if (msg == null) {
            Assert.fail((String)"Should have gotten a message");
        }
        String receivedPropertyValue = msg.getProperty("key");
        Assert.assertEquals((String)"value", (String)receivedPropertyValue);
        Assert.assertNotEquals((Object)((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().values().iterator().next()).getUnackedMessages(), (Object)totalMsgs);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        Awaitility.await().ignoreExceptions().untilAsserted(() -> Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), (int)0));
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    public static class ByteBufferFunction
    implements Function<ByteBuffer, ByteBuffer> {
        public ByteBuffer process(ByteBuffer input, Context context) throws Exception {
            Assert.assertTrue((boolean)input.isDirect());
            return input;
        }
    }
}

