/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FileServer;
import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class PulsarFunctionLocalRunTest {
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    WorkerConfig workerConfig;
    URL urlTls;
    PulsarService pulsar;
    PulsarAdmin admin;
    PulsarClient pulsarClient;
    BrokerStats brokerStatsClient;
    final String tenant = "external-repl-prop";
    String pulsarFunctionsNamespace = "external-repl-prop/pulsar-function-admin";
    String primaryHost;
    String workerId;
    private static final String CLUSTER = "local";
    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
    private static final String SYSTEM_PROPERTY_NAME_NAR_FILE_PATH = "pulsar-io-data-generator.nar.path";
    private PulsarFunctionTestTemporaryDirectory tempDirectory;
    private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_JAR_FILE_PATH = "pulsar-functions-api-examples.jar.path";
    private static final String SYSTEM_PROPERTY_NAME_BATCH_NAR_FILE_PATH = "pulsar-io-batch-data-generator.nar.path";
    private URLClassLoader pulsarApiExamplesClassLoader;
    private Class<?> avroTestObjectClass;
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionLocalRunTest.class);
    private FileServer fileServer;

    public static File getPulsarIODataGeneratorNar() {
        return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NAR_FILE_PATH), "pulsar-io-data-generator.nar file location must be specified with pulsar-io-data-generator.nar.path system property"));
    }

    public static File getPulsarApiExamplesJar() {
        return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_JAR_FILE_PATH), "pulsar-functions-api-examples.jar file location must be specified with pulsar-functions-api-examples.jar.path system property"));
    }

    public static File getPulsarIOBatchDataGeneratorNar() {
        return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_BATCH_NAR_FILE_PATH), "pulsar-io-batch-data-generator.nar file location must be specified with pulsar-io-batch-data-generator.nar.path system property"));
    }

    @DataProvider(name="validRoleName")
    public Object[][] validRoleName() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @BeforeClass(alwaysRun=true)
    void loadPulsarApiExamples() throws MalformedURLException, ClassNotFoundException {
        this.pulsarApiExamplesClassLoader = new URLClassLoader(new URL[]{PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toURL()}, Thread.currentThread().getContextClassLoader());
        this.avroTestObjectClass = this.pulsarApiExamplesClassLoader.loadClass("org.apache.pulsar.functions.api.examples.pojo.AvroTestObject");
    }

    @AfterClass(alwaysRun=true)
    void closeClassLoader() throws IOException {
        if (this.pulsarApiExamplesClassLoader != null) {
            this.pulsarApiExamplesClassLoader.close();
            this.pulsarApiExamplesClassLoader = null;
        }
    }

    @BeforeMethod(alwaysRun=true)
    void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", (Object)method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        this.config = (ServiceConfiguration)Mockito.spy(ServiceConfiguration.class);
        this.config.setClusterName(CLUSTER);
        HashSet superUsers = Sets.newHashSet((Object[])new String[]{"superUser", "admin"});
        this.config.setSuperUserRoles((Set)superUsers);
        this.config.setWebServicePort(Optional.of(0));
        this.config.setWebServicePortTls(Optional.of(0));
        this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        this.config.setBrokerShutdownTimeoutMs(0L);
        this.config.setBrokerServicePort(Optional.of(0));
        this.config.setBrokerServicePortTls(Optional.of(0));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        HashSet<String> providers = new HashSet<String>();
        providers.add(AuthenticationProviderTls.class.getName());
        this.config.setAuthenticationEnabled(true);
        this.config.setAuthenticationProviders(providers);
        this.config.setAuthorizationEnabled(true);
        this.config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
        this.config.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        this.config.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        this.config.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.config.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
        this.config.setBrokerClientTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        this.config.setBrokerClientTlsEnabled(true);
        this.config.setAllowAutoTopicCreationType("non-partitioned");
        this.workerConfig = this.createWorkerConfig(this.config);
        if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
            File connectorsDir = new File(this.workerConfig.getConnectorsDirectory());
            File file = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar();
            Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath(), new CopyOption[0]);
        }
        Optional functionWorkerService = Optional.empty();
        this.pulsar = new PulsarService(this.config, this.workerConfig, functionWorkerService, exitCode -> {});
        this.pulsar.start();
        String brokerServiceUrl = this.pulsar.getWebServiceAddressTls();
        this.urlTls = new URL(brokerServiceUrl);
        HashMap<String, String> authParams = new HashMap<String, String>();
        authParams.put("tlsCertFile", "./src/test/resources/authentication/tls/client-cert.pem");
        authParams.put("tlsKeyFile", "./src/test/resources/authentication/tls/client-key.pem");
        AuthenticationTls authTls = new AuthenticationTls();
        authTls.configure(authParams);
        this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).authentication((Authentication)authTls).build());
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = this.pulsar.getWebServiceAddress();
        ClusterData clusterData = ClusterData.builder().serviceUrl(this.urlTls.toString()).build();
        this.admin.clusters().createCluster(this.config.getClusterName(), clusterData);
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl());
        if (StringUtils.isNotBlank((CharSequence)this.workerConfig.getBrokerClientAuthenticationPlugin()) && StringUtils.isNotBlank((CharSequence)this.workerConfig.getBrokerClientAuthenticationParameters())) {
            clientBuilder.enableTls(this.workerConfig.isUseTls());
            clientBuilder.allowTlsInsecureConnection(this.workerConfig.isTlsAllowInsecureConnection());
            clientBuilder.authentication(this.workerConfig.getBrokerClientAuthenticationPlugin(), this.workerConfig.getBrokerClientAuthenticationParameters());
            clientBuilder.serviceUrl(this.pulsar.getBrokerServiceUrlTls());
        }
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        this.pulsarClient = clientBuilder.build();
        TenantInfo propAdmin = TenantInfo.builder().adminRoles(Collections.singleton("superUser")).allowedClusters((Set)Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{CLUSTER}))).build();
        this.admin.tenants().createTenant("external-repl-prop", propAdmin);
        this.fileServer = new FileServer();
        this.fileServer.serveFile("/pulsar-io-data-generator.nar", PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar());
        this.fileServer.serveFile("/pulsar-functions-api-examples.jar", PulsarFunctionLocalRunTest.getPulsarApiExamplesJar());
        this.fileServer.start();
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        try {
            log.info("--- Shutting down ---");
            this.fileServer.stop();
            this.pulsarClient.close();
            this.admin.close();
            this.pulsar.close();
            this.bkEnsemble.stop();
        }
        finally {
            if (this.tempDirectory != null) {
                this.tempDirectory.delete();
            }
        }
    }

    protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
        System.setProperty("pulsar.functions.java.instance.jar", FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        WorkerConfig workerConfig = new WorkerConfig();
        this.tempDirectory = PulsarFunctionTestTemporaryDirectory.create(this.getClass().getSimpleName());
        this.tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
        workerConfig.setPulsarFunctionsNamespace(this.pulsarFunctionsNamespace);
        workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        workerConfig.setFunctionRuntimeFactoryConfigs((Map)ObjectMapperFactory.getThreadLocal().convertValue((Object)new ThreadRuntimeFactoryConfig().setThreadGroupName(CLUSTER), Map.class));
        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
        workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
        workerConfig.setFailureCheckFreqMs(100L);
        workerConfig.setNumFunctionPackageReplicas(1);
        workerConfig.setClusterCoordinationTopicName("coordinate");
        workerConfig.setFunctionAssignmentTopicName("assignment");
        workerConfig.setFunctionMetadataTopicName("metadata");
        workerConfig.setInstanceLivenessCheckFreqMs(100L);
        workerConfig.setWorkerPort(Integer.valueOf(0));
        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)config.getAdvertisedAddress());
        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
        workerConfig.setWorkerHostname(hostname);
        workerConfig.setWorkerId(this.workerId);
        workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        workerConfig.setBrokerClientAuthenticationParameters(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem"));
        workerConfig.setUseTls(true);
        workerConfig.setTlsAllowInsecureConnection(true);
        workerConfig.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        workerConfig.setAuthenticationEnabled(true);
        workerConfig.setAuthorizationEnabled(true);
        return workerConfig;
    }

    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(tenant);
        functionConfig.setNamespace(namespace);
        functionConfig.setName(functionName);
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
        functionConfig.setSubName(subscriptionName);
        functionConfig.setInputs(Collections.singleton(sourceTopic));
        functionConfig.setAutoAck(Boolean.valueOf(true));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput(sinkTopic);
        functionConfig.setCleanupSubscription(Boolean.valueOf(true));
        return functionConfig;
    }

    private static SourceConfig createSourceConfig(String tenant, String namespace, String functionName, String sinkTopic) {
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setTenant(tenant);
        sourceConfig.setNamespace(namespace);
        sourceConfig.setName(functionName);
        sourceConfig.setParallelism(Integer.valueOf(1));
        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sourceConfig.setTopicName(sinkTopic);
        return sourceConfig;
    }

    private static SinkConfig createSinkConfig(String tenant, String namespace, String functionName, String sourceTopic, String subName) {
        SinkConfig sinkConfig = new SinkConfig();
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(functionName);
        sinkConfig.setParallelism(Integer.valueOf(1));
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
        sinkConfig.setSourceSubscriptionName(subName);
        sinkConfig.setCleanupSubscription(Boolean.valueOf(true));
        sinkConfig.setConfigs(new HashMap());
        return sinkConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{CLUSTER}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output"}).subscriptionName("sub").subscribe();
        FunctionConfig functionConfig = PulsarFunctionLocalRunTest.createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", "persistent://external-repl-prop/io/my-topic1", "persistent://external-repl-prop/io/output", "test-sub");
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        functionConfig.setJar(jarFilePathUrl);
        functionConfig.setParallelism(Integer.valueOf(parallelism));
        int metricsPort = FunctionCommon.findAvailablePort();
        LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).metricsPortStart(Integer.valueOf(metricsPort)).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).build();
        try {
            int i;
            localRunner.start(false);
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    boolean result = false;
                    TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1");
                    if (topicStats.getSubscriptions().containsKey("test-sub") && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().size() == parallelism) {
                        for (ConsumerStats consumerStats : ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers()) {
                            result = consumerStats.getAvailablePermits() == 1000 && consumerStats.getMetadata() != null && consumerStats.getMetadata().containsKey("id") && ((String)consumerStats.getMetadata().get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
                        }
                    }
                    return result;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L));
            TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1");
            Assert.assertTrue((stats.getSubscriptions().get("test-sub") != null && !((SubscriptionStats)stats.getSubscriptions().get("test-sub")).getConsumers().isEmpty() ? 1 : 0) != 0);
            int totalMsgs = 5;
            for (i = 0; i < totalMsgs; ++i) {
                String data = "my-message-" + i;
                producer.newMessage().property("key", "value").value((Object)data).send();
            }
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub");
                    return subStats.getUnackedMessages() == 0L;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L);
            for (i = 0; i < totalMsgs; ++i) {
                Message msg = consumer.receive(5, TimeUnit.SECONDS);
                String receivedPropertyValue = msg.getProperty("key");
                Assert.assertEquals((String)"value", (String)receivedPropertyValue);
                Assert.assertEquals((String)((String)msg.getValue()), (String)("my-message-" + i + "!"));
            }
            Assert.assertNotEquals((Object)((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().values().iterator().next()).getUnackedMessages(), (Object)totalMsgs);
            String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
            log.info("prometheus metrics: {}", (Object)prometheusMetrics);
            HashMap metricsMap = new HashMap();
            Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
                if (line.startsWith("pulsar_function_processed_successfully_total")) {
                    Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
                    Assert.assertFalse((boolean)metrics.isEmpty());
                    PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
                    if (m != null) {
                        metricsMap.put(m.tags.get("instance_id"), m);
                    }
                }
            });
            Assert.assertEquals((int)metricsMap.size(), (int)parallelism);
            double totalMsgRecv = 0.0;
            for (int i2 = 0; i2 < parallelism; ++i2) {
                PulsarFunctionTestUtils.Metric m = (PulsarFunctionTestUtils.Metric)metricsMap.get(String.valueOf(i2));
                Assert.assertNotNull((Object)m);
                Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
                Assert.assertEquals((String)m.tags.get("instance_id"), (String)String.valueOf(i2));
                Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarFunction-test");
                Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
                Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarFunction-test"));
                totalMsgRecv += m.value;
            }
            Assert.assertEquals((double)totalMsgRecv, (double)totalMsgs);
            localRunner.stop();
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1");
                    return topicStats.getSubscriptions().get("test-sub") != null && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().isEmpty();
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 20, 150L);
            TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1");
            Assert.assertTrue((topicStats.getSubscriptions().get("test-sub") != null && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().isEmpty() ? 1 : 0) != 0);
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    return this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size() == 0;
                }
                catch (PulsarAdminException e) {
                    return e.getStatusCode() == 404;
                }
            }, 10, 150L);
            try {
                Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size(), (int)0);
            }
            catch (PulsarAdminException e) {
                if (e.getStatusCode() != 404) {
                    Assert.fail();
                }
            }
        }
        finally {
            if (Collections.singletonList(localRunner).get(0) != null) {
                localRunner.close();
            }
        }
    }

    protected void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
        this.testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAvroFunctionLocalRun(String jarFilePathUrl) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic1";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String propertyKey = "key";
        String propertyValue = "value";
        String functionName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{CLUSTER}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Schema schema = Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(true).withJSR310ConversionEnabled(true).withPojo(this.avroTestObjectClass).build());
        this.admin.schemas().createSchema("persistent://external-repl-prop/io/my-topic1", schema.getSchemaInfo());
        Producer producer = this.pulsarClient.newProducer(schema).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{"persistent://external-repl-prop/io/output"}).subscriptionName("sub").subscribe();
        FunctionConfig functionConfig = PulsarFunctionLocalRunTest.createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", "persistent://external-repl-prop/io/my-topic1", "persistent://external-repl-prop/io/output", "test-sub");
        HashMap<String, String> schemaInput = new HashMap<String, String>();
        schemaInput.put("persistent://external-repl-prop/io/my-topic1", "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"true\",\"__alwaysAllowNull\":\"true\"}}");
        HashMap<String, String> schemaOutput = new HashMap<String, String>();
        schemaOutput.put("persistent://external-repl-prop/io/output", "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"true\",\"__alwaysAllowNull\":\"true\"}}");
        functionConfig.setCustomSchemaInputs(schemaInput);
        functionConfig.setCustomSchemaOutputs(schemaOutput);
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        if (jarFilePathUrl == null) {
            functionConfig.setClassName("org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction");
        } else {
            functionConfig.setJar(jarFilePathUrl);
        }
        LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).build();
        try {
            int i;
            localRunner.start(false);
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1");
                    return stats.getSubscriptions().get("test-sub") != null && !((SubscriptionStats)stats.getSubscriptions().get("test-sub")).getConsumers().isEmpty();
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L);
            int totalMsgs = 5;
            Method setBaseValueMethod = this.avroTestObjectClass.getMethod("setBaseValue", Integer.TYPE);
            for (i = 0; i < totalMsgs; ++i) {
                Object avroTestObject = this.avroTestObjectClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                setBaseValueMethod.invoke(avroTestObject, i);
                producer.newMessage().property("key", "value").value(avroTestObject).send();
            }
            for (i = 0; i < totalMsgs; ++i) {
                Message msg = consumer.receive(5, TimeUnit.SECONDS);
                String receivedPropertyValue = msg.getProperty("key");
                Assert.assertEquals((String)"value", (String)receivedPropertyValue);
                Assert.assertEquals((Object)((GenericRecord)msg.getValue()).getField("baseValue"), (Object)(10 + i));
                consumer.acknowledge(msg);
            }
            Assert.assertNotEquals((Object)((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/output").getSubscriptions().values().iterator().next()).getUnackedMessages(), (Object)0);
            localRunner.stop();
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1");
                    return topicStats.getSubscriptions().get("test-sub") != null && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().isEmpty();
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 20, 150L);
            schemaInput.put("persistent://external-repl-prop/io/my-topic1", "{\"schemaType\":\"AVRO\",\"schemaProperties\":{\"__jsr310ConversionEnabled\":\"false\",\"__alwaysAllowNull\":\"false\"}}");
            localRunner = LocalRunner.builder().functionConfig(functionConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).build();
            localRunner.start(false);
            producer.newMessage().property("key", "value").value(this.avroTestObjectClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0])).send();
            Message msg = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)msg);
            producer.close();
            consumer.close();
            localRunner.stop();
        }
        finally {
            if (Collections.singletonList(localRunner).get(0) != null) {
                localRunner.close();
            }
        }
    }

    @Test(timeOut=20000L)
    public void testE2EPulsarFunctionLocalRun() throws Throwable {
        this.runWithPulsarFunctionsClassLoader(() -> this.testE2EPulsarFunctionLocalRun(null));
    }

    @Test(timeOut=30000L)
    public void testAvroFunctionLocalRun() throws Throwable {
        this.runWithPulsarFunctionsClassLoader(() -> this.testAvroFunctionLocalRun(null));
    }

    @Test(timeOut=20000L)
    public void testE2EPulsarFunctionLocalRunWithJar() throws Exception {
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        this.testE2EPulsarFunctionLocalRun(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testE2EPulsarFunctionLocalRunURL() throws Exception {
        this.testE2EPulsarFunctionLocalRun(this.fileServer.getUrl("/pulsar-functions-api-examples.jar"));
    }

    @Test(timeOut=40000L)
    public void testE2EPulsarFunctionLocalRunMultipleInstances() throws Throwable {
        this.runWithPulsarFunctionsClassLoader(() -> this.testE2EPulsarFunctionLocalRun(null, 2));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPulsarSourceLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String sourceName = "PulsarSource-test";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{CLUSTER}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        SourceConfig sourceConfig = PulsarFunctionLocalRunTest.createSourceConfig("external-repl-prop", "io", "PulsarSource-test", "persistent://external-repl-prop/io/output");
        if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
            sourceConfig.setClassName("org.apache.pulsar.io.datagenerator.DataGeneratorSource");
        }
        sourceConfig.setArchive(jarFilePathUrl);
        sourceConfig.setParallelism(Integer.valueOf(parallelism));
        int metricsPort = FunctionCommon.findAvailablePort();
        LocalRunner localRunner = LocalRunner.builder().sourceConfig(sourceConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).connectorsDirectory(this.workerConfig.getConnectorsDirectory()).metricsPortStart(Integer.valueOf(metricsPort)).build();
        try {
            localRunner.start(false);
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    return this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size() == parallelism;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 10, 150L));
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    boolean result = false;
                    TopicStats sourceStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output");
                    if (sourceStats.getPublishers().size() == parallelism) {
                        for (PublisherStats publisher : sourceStats.getPublishers()) {
                            result = publisher.getMetadata() != null && publisher.getMetadata().containsKey("id") && ((String)publisher.getMetadata().get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarSource-test"));
                        }
                    }
                    return result;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L));
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    return this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size() == parallelism && this.admin.topics().getInternalStats((String)"persistent://external-repl-prop/io/output", (boolean)false).numberOfEntries > 4L;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L));
            Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size(), (int)parallelism);
            String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
            log.info("prometheus metrics: {}", (Object)prometheusMetrics);
            HashMap metricsMap = new HashMap();
            Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
                if (line.startsWith("pulsar_source_written_total")) {
                    Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
                    Assert.assertFalse((boolean)metrics.isEmpty());
                    PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_written_total");
                    if (m != null) {
                        metricsMap.put(m.tags.get("instance_id"), m);
                    }
                }
            });
            Assert.assertEquals((int)metricsMap.size(), (int)parallelism);
            for (int i = 0; i < parallelism; ++i) {
                PulsarFunctionTestUtils.Metric m = (PulsarFunctionTestUtils.Metric)metricsMap.get(String.valueOf(i));
                Assert.assertNotNull((Object)m);
                Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
                Assert.assertEquals((String)m.tags.get("instance_id"), (String)String.valueOf(i));
                Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSource-test");
                Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
                Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSource-test"));
                Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
            }
            localRunner.stop();
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    return this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size() == 0;
                }
                catch (PulsarAdminException e) {
                    return e.getStatusCode() == 404;
                }
            }, 10, 150L));
            try {
                Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/output").getPublishers().size(), (int)0);
            }
            catch (PulsarAdminException e) {
                if (e.getStatusCode() != 404) {
                    Assert.fail();
                }
            }
        }
        finally {
            if (Collections.singletonList(localRunner).get(0) != null) {
                localRunner.close();
            }
        }
    }

    private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
        this.testPulsarSourceLocalRun(jarFilePathUrl, 1);
    }

    @Test(timeOut=20000L, groups={"builtin"})
    public void testPulsarSourceStatsBuiltin() throws Exception {
        this.testPulsarSourceLocalRun(String.format("%s://data-generator", "builtin"));
    }

    @Test(timeOut=20000L)
    public void testPulsarSourceLocalRunNoArchive() throws Throwable {
        this.runWithNarClassLoader(() -> this.testPulsarSourceLocalRun(null));
    }

    @Test(timeOut=20000L)
    public void testPulsarSourceLocalRunWithFile() throws Exception {
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString();
        this.testPulsarSourceLocalRun(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testPulsarSourceLocalRunWithUrl() throws Exception {
        this.testPulsarSourceLocalRun(this.fileServer.getUrl("/pulsar-io-data-generator.nar"));
    }

    @Test(timeOut=40000L)
    public void testPulsarSourceLocalRunMultipleInstances() throws Throwable {
        this.runWithNarClassLoader(() -> this.testPulsarSourceLocalRun(null, 2));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/input";
        String sinkName = "PulsarSink-test";
        String propertyKey = "key";
        String propertyValue = "value";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{CLUSTER}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        SinkConfig sinkConfig = PulsarFunctionLocalRunTest.createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(1000)).build()));
        if (className != null) {
            sinkConfig.setClassName(className);
        } else if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
            sinkConfig.setClassName("org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink");
        }
        sinkConfig.setArchive(jarFilePathUrl);
        sinkConfig.setParallelism(Integer.valueOf(parallelism));
        int metricsPort = FunctionCommon.findAvailablePort();
        LocalRunner localRunner = LocalRunner.builder().sinkConfig(sinkConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).connectorsDirectory(this.workerConfig.getConnectorsDirectory()).metricsPortStart(Integer.valueOf(metricsPort)).build();
        try {
            localRunner.start(false);
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    boolean result = false;
                    TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                    if (topicStats.getSubscriptions().containsKey("test-sub") && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().size() == parallelism) {
                        for (ConsumerStats consumerStats : ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers()) {
                            result = consumerStats.getAvailablePermits() == 1000;
                        }
                    }
                    return result;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 20, 150L));
            int totalMsgs = 10;
            for (int i = 0; i < totalMsgs; ++i) {
                String data = "my-message-" + i;
                producer.newMessage().property("key", "value").value((Object)data).send();
            }
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/input").getSubscriptions().get("test-sub");
                    return subStats.getUnackedMessages() == 0L && subStats.getMsgThroughputOut() == (double)totalMsgs;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 5, 200L));
            String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
            log.info("prometheus metrics: {}", (Object)prometheusMetrics);
            HashMap metricsMap = new HashMap();
            Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> {
                if (line.startsWith("pulsar_sink_written_total")) {
                    Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
                    Assert.assertFalse((boolean)metrics.isEmpty());
                    PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
                    if (m != null) {
                        metricsMap.put(m.tags.get("instance_id"), m);
                    }
                } else if (line.startsWith("pulsar_sink_sink_exceptions_total")) {
                    Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
                    Assert.assertFalse((boolean)metrics.isEmpty());
                    PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_sink_exceptions_total");
                    if (m == null) {
                        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
                    }
                    Assert.assertEquals((double)m.value, (double)0.0);
                }
            });
            Assert.assertEquals((int)metricsMap.size(), (int)parallelism);
            double totalNumRecvMsg = 0.0;
            for (int i = 0; i < parallelism; ++i) {
                PulsarFunctionTestUtils.Metric m = (PulsarFunctionTestUtils.Metric)metricsMap.get(String.valueOf(i));
                Assert.assertNotNull((Object)m);
                Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
                Assert.assertEquals((String)m.tags.get("instance_id"), (String)String.valueOf(i));
                Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
                Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
                Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
                totalNumRecvMsg += m.value;
            }
            Assert.assertEquals((double)totalNumRecvMsg, (double)totalMsgs);
            localRunner.stop();
            Assert.assertTrue((boolean)MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                    return stats.getSubscriptions().get("test-sub") != null && ((SubscriptionStats)stats.getSubscriptions().get("test-sub")).getConsumers().isEmpty();
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 20, 150L));
            TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
            Assert.assertTrue((topicStats.getSubscriptions().get("test-sub") != null && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().isEmpty() ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(localRunner).get(0) != null) {
                localRunner.close();
            }
        }
    }

    private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
        this.testPulsarSinkLocalRun(jarFilePathUrl, 1);
    }

    private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
        this.testPulsarSinkLocalRun(jarFilePathUrl, parallelism, null);
    }

    @Test(timeOut=20000L, groups={"builtin"})
    public void testPulsarSinkStatsBuiltin() throws Exception {
        this.testPulsarSinkLocalRun(String.format("%s://data-generator", "builtin"));
    }

    @Test(timeOut=20000L)
    public void testPulsarSinkStatsNoArchive() throws Throwable {
        this.runWithNarClassLoader(() -> this.testPulsarSinkLocalRun(null));
    }

    @Test(timeOut=20000L)
    public void testPulsarSinkStatsWithFile() throws Exception {
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString();
        this.testPulsarSinkLocalRun(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testPulsarSinkStatsWithUrl() throws Exception {
        this.testPulsarSinkLocalRun(this.fileServer.getUrl("/pulsar-io-data-generator.nar"));
    }

    @Test(timeOut=40000L)
    public void testPulsarSinkStatsMultipleInstances() throws Throwable {
        this.runWithNarClassLoader(() -> this.testPulsarSinkLocalRun(null, 2));
    }

    @Test
    public void testPulsarSinkStatsByteBufferType() throws Throwable {
        this.runWithNarClassLoader(() -> this.testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName()));
    }

    @Test(timeOut=20000L)
    public void testExitOnError() throws Throwable {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/input";
        String sinkName = "PulsarSink-test";
        String propertyKey = "key";
        String propertyValue = "value";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{CLUSTER}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        SinkConfig sinkConfig = PulsarFunctionLocalRunTest.createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(1000)).build()));
        sinkConfig.setClassName(TestErrorSink.class.getName());
        int metricsPort = FunctionCommon.findAvailablePort();
        LocalRunner.LocalRunnerBuilder localRunnerBuilder = LocalRunner.builder().clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).connectorsDirectory(this.workerConfig.getConnectorsDirectory()).metricsPortStart(Integer.valueOf(metricsPort)).exitOnError(true);
        sinkConfig.getConfigs().put("throwErrorOpen", true);
        localRunnerBuilder.sinkConfig(sinkConfig);
        LocalRunner localRunner = localRunnerBuilder.build();
        localRunner.start(true);
        sinkConfig.getConfigs().put("throwErrorWrite", true);
        localRunnerBuilder.sinkConfig(sinkConfig);
        localRunner = localRunnerBuilder.build();
        localRunner.start(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
        try (NarClassLoader classLoader = NarClassLoader.getFromArchive((File)PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar(), Collections.emptySet(), (ClassLoader)originalClassLoader, (String)NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);){
            try {
                Thread.currentThread().setContextClassLoader((ClassLoader)classLoader);
                throwingRunnable.run();
            }
            finally {
                Thread.currentThread().setContextClassLoader(originalClassLoader);
            }
        }
    }

    protected void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(this.pulsarApiExamplesClassLoader);
            throwingRunnable.run();
        }
        finally {
            Thread.currentThread().setContextClassLoader(originalClassLoader);
        }
    }

    public static class TestErrorSink
    implements Sink<byte[]> {
        private Map config;

        public void open(Map map, SinkContext sinkContext) throws Exception {
            this.config = map;
            if (map.containsKey("throwErrorOpen")) {
                throw new Exception("error on open");
            }
        }

        public void write(Record<byte[]> record) throws Exception {
            if (this.config.containsKey("throwErrorWrite")) {
                throw new Exception("error on write");
            }
            record.ack();
        }

        public void close() throws Exception {
            if (this.config.containsKey("throwErrorClose")) {
                throw new Exception("error on close");
            }
        }
    }

    public static class StatsNullSink
    implements Sink<ByteBuffer> {
        volatile long bytesTotal = 0L;

        public void open(Map map, SinkContext sinkContext) throws Exception {
        }

        public void write(Record<ByteBuffer> record) throws Exception {
            this.bytesTotal += (long)((ByteBuffer)record.getValue()).capacity();
            record.ack();
        }

        public void close() throws Exception {
        }
    }
}

