/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources;

import java.util.Collections;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PulsarSourcePropertyTest
extends PulsarStandaloneTestSuite {
    private static final Logger log = LoggerFactory.getLogger(PulsarSourcePropertyTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"source"})
    public void testSourceProperty() throws Exception {
        String outputTopicName = "test-source-property-input-" + PulsarSourcePropertyTest.randomName(8);
        String sourceName = "test-source-property-" + PulsarSourcePropertyTest.randomName(8);
        this.submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestPropertySource", "/pulsar/examples/java-test-functions.jar");
        this.getSourceInfoSuccess(sourceName);
        this.getSourceStatus(sourceName);
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.container.getHttpServiceUrl()).build();){
            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
                Assert.assertEquals((int)status.getInstances().size(), (int)1);
                Assert.assertTrue((((SourceStatus.SourceInstanceStatus)status.getInstances().get((int)0)).getStatus().numWritten > 0L ? 1 : 0) != 0);
            });
        }
        PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{outputTopicName}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscribe();
            try {
                for (int i = 0; i < 10; ++i) {
                    Message msg = consumer.receive();
                    Assert.assertEquals((String)((String)msg.getValue()), (String)"property");
                    Assert.assertEquals((String)msg.getProperty("hello"), (String)"world");
                    Assert.assertEquals((String)msg.getProperty("foo"), (String)"bar");
                }
                this.deleteSource(sourceName);
                this.getSourceInfoNotFound(sourceName);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private void submitSourceConnector(String sourceName, String outputTopicName, String className, String archive) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sources", "create", "--name", sourceName, "--destinationTopicName", outputTopicName, "--archive", archive, "--classname", className};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.container.execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Created successfully\""), (String)result.getStdout());
    }

    private void getSourceInfoSuccess(String sourceName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + sourceName + "\""));
    }

    private void getSourceStatus(String sourceName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "status", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    private void deleteSource(String sourceName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "delete", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("Delete source successfully"));
        result.assertNoStderr();
    }

    private void getSourceInfoNotFound(String sourceName) throws Exception {
        try {
            this.container.execCmd("/pulsar/bin/pulsar-admin", "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
        }
    }
}

