/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.pulsar.io.AbstractPulsarE2ETest;
import org.apache.pulsar.io.SinkForTest;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker-io"})
public class PulsarSinkE2ETest
extends AbstractPulsarE2ETest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadCompactedSink() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic2";
        String sinkName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        int messageNum = 20;
        int maxKeys = 10;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://external-repl-prop/io/my-topic2"}).subscriptionName("test-sub").readCompacted(true).subscribe().close();
            HashMap<String, String> expected = new HashMap<String, String>();
            for (int j = 0; j < 20; ++j) {
                String key = "key" + j % 10;
                String value = "my-message-" + key + j;
                producer.newMessage().key(key).value((Object)value).send();
                expected.put(key, value);
            }
            ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
            try {
                TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(this.config, this.pulsarClient, this.pulsar.getBookKeeperClient(), compactionScheduler);
                twoPhaseCompactor.compact("persistent://external-repl-prop/io/my-topic2").get();
                SinkConfig sinkConfig = PulsarSinkE2ETest.createSinkConfig("external-repl-prop", "io", "PulsarFunction-test", "persistent://external-repl-prop/io/my-topic2", "test-sub");
                sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
                HashMap<String, String> consumerProperties = new HashMap<String, String>();
                consumerProperties.put("readCompacted", "true");
                sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/my-topic2", ConsumerConfig.builder().consumerProperties(consumerProperties).build()));
                String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString();
                this.admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
                Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                    String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
                    Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
                    PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
                    Assert.assertEquals((double)m.value, (double)10.0);
                });
            }
            finally {
                if (Collections.singletonList(compactionScheduler).get(0) != null) {
                    compactionScheduler.shutdownNow();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testPulsarSinkDLQ() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/input";
        String dlqTopic = "persistent://external-repl-prop/io/input-DLQ";
        String sinkName = "PulsarSink-test";
        String propertyKey = "key";
        String propertyValue = "value";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/input-DLQ"}).subscriptionName("test-sub").subscribe();
        SinkConfig sinkConfig = PulsarSinkE2ETest.createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        sinkConfig.setNegativeAckRedeliveryDelayMs(Long.valueOf(1001L));
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setMaxMessageRetries(Integer.valueOf(2));
        sinkConfig.setDeadLetterTopic("persistent://external-repl-prop/io/input-DLQ");
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(1000)).build()));
        sinkConfig.setClassName(SinkForTest.class.getName());
        LocalRunner localRunner = LocalRunner.builder().sinkConfig(sinkConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).build();
        try {
            int i;
            localRunner.start(false);
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                    return topicStats.getSubscriptions().containsKey("test-sub") && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().size() == 1 && ((ConsumerStats)((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().get(0)).getAvailablePermits() == 1000;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L);
            int totalMsgs = 10;
            HashSet<String> remainingMessagesToReceive = new HashSet<String>();
            for (i = 0; i < totalMsgs; ++i) {
                String messageBody = "fail" + i;
                producer.newMessage().property("key", "value").value((Object)messageBody).send();
                remainingMessagesToReceive.add(messageBody);
            }
            for (i = 0; i < totalMsgs; ++i) {
                Message message = consumer.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)message);
                remainingMessagesToReceive.remove(message.getValue());
            }
            Assert.assertEquals(remainingMessagesToReceive, Collections.emptySet());
            producer.close();
            consumer.close();
        }
        finally {
            if (Collections.singletonList(localRunner).get(0) != null) {
                localRunner.close();
            }
        }
    }

    private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
        this.testPulsarSinkStats(jarFilePathUrl, null);
    }

    private void testPulsarSinkStats(String jarFilePathUrl, Function<SinkConfig, SinkConfig> override) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/input";
        String sinkName = "PulsarSink-test";
        String propertyKey = "key";
        String propertyValue = "value";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        SinkConfig sinkConfig = PulsarSinkE2ETest.createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(1000)).build()));
        if (override != null) {
            sinkConfig = override.apply(sinkConfig);
        }
        if (jarFilePathUrl.startsWith("builtin")) {
            sinkConfig.setArchive(jarFilePathUrl);
            this.admin.sinks().createSink(sinkConfig, null);
        } else {
            this.admin.sinks().createSinkWithUrl(sinkConfig, jarFilePathUrl);
        }
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(523)).build()));
        if (override != null) {
            sinkConfig = override.apply(sinkConfig);
        }
        if (jarFilePathUrl.startsWith("builtin")) {
            sinkConfig.setArchive(jarFilePathUrl);
            this.admin.sinks().updateSink(sinkConfig, null);
        } else {
            this.admin.sinks().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                return topicStats.getSubscriptions().containsKey("test-sub") && ((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().size() == 1 && ((ConsumerStats)((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().get(0)).getAvailablePermits() == 523;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
        Assert.assertEquals((int)topicStats.getSubscriptions().size(), (int)1);
        Assert.assertTrue((boolean)topicStats.getSubscriptions().containsKey("test-sub"));
        Assert.assertEquals((int)((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().size(), (int)1);
        Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)topicStats.getSubscriptions().get("test-sub")).getConsumers().get(0)).getAvailablePermits(), (int)523);
        SinkStatus status = this.admin.sinks().getSinkStatus("external-repl-prop", "io", "PulsarSink-test");
        status.getInstances().forEach(sinkInstanceStatus -> Assert.assertEquals((long)sinkInstanceStatus.status.numSinkExceptions, (long)0L));
        status.getInstances().forEach(sinkInstanceStatus -> Assert.assertEquals((long)sinkInstanceStatus.status.numSystemExceptions, (long)0L));
        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
        log.info("prometheus metrics: {}", (Object)prometheusMetrics);
        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/input").getSubscriptions().get("test-sub");
                return subStats.getUnackedMessages() == 0L && subStats.getMsgOutCounter() == (long)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/io/input").getSubscriptions().get("test-sub");
        Assert.assertEquals((long)subStats.getUnackedMessages(), (long)0L);
        Assert.assertEquals((long)subStats.getMsgOutCounter(), (long)totalMsgs);
        Assert.assertEquals((long)subStats.getMsgBacklog(), (long)0L);
        status = this.admin.sinks().getSinkStatus("external-repl-prop", "io", "PulsarSink-test");
        status.getInstances().forEach(sinkInstanceStatus -> Assert.assertEquals((long)sinkInstanceStatus.status.numSinkExceptions, (long)0L));
        status.getInstances().forEach(sinkInstanceStatus -> Assert.assertEquals((long)sinkInstanceStatus.status.numSystemExceptions, (long)0L));
        prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
        log.info("prometheusMetrics: {}", (Object)prometheusMetrics);
        metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        m = metrics.get("pulsar_sink_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        this.admin.sinks().deleteSink("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/input").getSubscriptions().size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/io/input").getSubscriptions().size(), (int)0);
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    @Test(timeOut=20000L, groups={"builtin"})
    public void testPulsarSinkStatsBuiltin() throws Exception {
        String jarFilePathUrl = String.format("%s://data-generator", "builtin");
        this.testPulsarSinkStats(jarFilePathUrl);
    }

    @Test(timeOut=20000L, groups={"builtin"}, expectedExceptions={PulsarAdminException.class}, expectedExceptionsMessageRegExp="Built-in sink is not available")
    public void testPulsarSinkStatsBuiltinDoesNotExist() throws Exception {
        String jarFilePathUrl = String.format("%s://foo", "builtin");
        this.testPulsarSinkStats(jarFilePathUrl);
    }

    @Test(timeOut=20000L)
    public void testPulsarSinkStatsWithFile() throws Exception {
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString();
        this.testPulsarSinkStats(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testPulsarSinkStatsWithUrl() throws Exception {
        this.testPulsarSinkStats(this.fileServer.getUrl("/pulsar-io-data-generator.nar"));
    }

    @Test(timeOut=20000L)
    public void testPulsarSinkPoolMessages() throws Exception {
        String jarFilePathUrl = PulsarSinkE2ETest.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
        this.testPulsarSinkStats(jarFilePathUrl, sinkConfig -> {
            sinkConfig.setClassName(ByteBufferSink.class.getName());
            sinkConfig.getInputSpecs().values().forEach(consumerConfig -> consumerConfig.setPoolMessages(true));
            return sinkConfig;
        });
    }

    private static SinkConfig createSinkConfig(String tenant, String namespace, String functionName, String sourceTopic, String subName) {
        SinkConfig sinkConfig = new SinkConfig();
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(functionName);
        sinkConfig.setParallelism(Integer.valueOf(1));
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
        sinkConfig.setSourceSubscriptionName(subName);
        sinkConfig.setCleanupSubscription(Boolean.valueOf(true));
        return sinkConfig;
    }

    public static class ByteBufferSink
    implements Sink<ByteBuffer> {
        public void open(Map map, SinkContext sinkContext) throws Exception {
        }

        public void write(Record<ByteBuffer> record) throws Exception {
            Assert.assertTrue((boolean)((ByteBuffer)record.getValue()).isDirect());
            record.ack();
        }

        public void close() throws Exception {
        }
    }
}

