/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sinks;

import com.google.gson.Gson;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.Policy;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SinkStatusUtil;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.PulsarIOTestRunner;
import org.apache.pulsar.tests.integration.io.sinks.JdbcPostgresSinkTester;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testng.Assert;
import org.testng.collections.Maps;

public class PulsarIOSinkRunner
extends PulsarIOTestRunner {
    private static final Logger log = LoggerFactory.getLogger(PulsarIOSinkRunner.class);

    public PulsarIOSinkRunner(PulsarCluster cluster, String functionRuntimeType) {
        super(cluster, functionRuntimeType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends GenericContainer> void runSinkTester(SinkTester<T> tester, boolean builtin) throws Exception {
        String tenant = "public";
        String namespace = "default";
        String inputTopicName = "test-sink-connector-" + (Object)((Object)tester.getSinkType()) + "-" + this.functionRuntimeType + "-input-topic-" + PulsarTestBase.randomName(8);
        String sinkName = "test-sink-connector-" + tester.getSinkType().name().toLowerCase() + "-" + this.functionRuntimeType + "-name-" + PulsarTestBase.randomName(8);
        int numMessages = 20;
        this.prepareSink(tester);
        this.ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", sinkName), tester.getInputTopicSchema());
        this.submitSinkConnector(tester, "public", "default", sinkName, inputTopicName);
        this.getSinkInfoSuccess(tester, "public", "default", sinkName, builtin);
        try {
            Failsafe.with((Policy[])new RetryPolicy[]{this.statusRetryPolicy}).run(() -> this.getSinkStatus("public", "default", sinkName));
            if (tester instanceof JdbcPostgresSinkTester) {
                Map<String, String> kvs = this.produceSchemaInsertMessagesToInputTopic(inputTopicName, 20, (Schema<JdbcPostgresSinkTester.Foo>)AvroSchema.of(JdbcPostgresSinkTester.Foo.class));
                Failsafe.with((Policy[])new RetryPolicy[]{this.statusRetryPolicy}).run(() -> this.waitForProcessingSinkMessages("public", "default", sinkName, 20));
                tester.validateSinkResult(kvs);
                kvs = this.produceSchemaUpdateMessagesToInputTopic(inputTopicName, 20, (Schema<JdbcPostgresSinkTester.Foo>)AvroSchema.of(JdbcPostgresSinkTester.Foo.class));
                Failsafe.with((Policy[])new RetryPolicy[]{this.statusRetryPolicy}).run(() -> this.waitForProcessingSinkMessages("public", "default", sinkName, 40));
                tester.validateSinkResult(kvs);
                kvs = this.produceSchemaDeleteMessagesToInputTopic(inputTopicName, 20, (Schema<JdbcPostgresSinkTester.Foo>)AvroSchema.of(JdbcPostgresSinkTester.Foo.class));
                Failsafe.with((Policy[])new RetryPolicy[]{this.statusRetryPolicy}).run(() -> this.waitForProcessingSinkMessages("public", "default", sinkName, 60));
                tester.validateSinkResult(kvs);
            } else {
                Map<String, String> kvs = this.produceMessagesToInputTopic(inputTopicName, 20, tester);
                Failsafe.with((Policy[])new RetryPolicy[]{this.statusRetryPolicy}).run(() -> this.waitForProcessingSinkMessages("public", "default", sinkName, 20));
                tester.validateSinkResult(kvs);
            }
        }
        finally {
            this.pulsarCluster.dumpFunctionLogs(sinkName);
        }
        this.updateSinkConnector(tester, "public", "default", sinkName, inputTopicName);
        this.deleteSink("public", "default", sinkName);
        this.getSinkInfoNotFound("public", "default", sinkName);
    }

    protected void prepareSink(SinkTester tester) throws Exception {
        tester.prepareSink();
    }

    protected void submitSinkConnector(SinkTester tester, String tenant, String namespace, String sinkName, String inputTopicName) throws Exception {
        Object[] commands = tester.getSinkType() != SinkTester.SinkType.UNDEFINED ? new String[]{"/pulsar/bin/pulsar-admin", "sink", "create", "--tenant", tenant, "--namespace", namespace, "--name", sinkName, "--sink-type", tester.sinkType().getValue().toLowerCase(), "--sinkConfig", new Gson().toJson(tester.sinkConfig()), "--inputs", inputTopicName} : new String[]{"/pulsar/bin/pulsar-admin", "sink", "create", "--tenant", tenant, "--namespace", namespace, "--name", sinkName, "--archive", tester.getSinkArchive(), "--classname", tester.getSinkClassName(), "--sinkConfig", new Gson().toJson(tester.sinkConfig()), "--inputs", inputTopicName};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Created successfully\""), (String)result.getStdout());
    }

    protected void updateSinkConnector(SinkTester tester, String tenant, String namespace, String sinkName, String inputTopicName) throws Exception {
        Object[] commands = tester.getSinkType() != SinkTester.SinkType.UNDEFINED ? new String[]{"/pulsar/bin/pulsar-admin", "sink", "update", "--tenant", tenant, "--namespace", namespace, "--name", sinkName, "--sink-type", tester.sinkType().getValue().toLowerCase(), "--sinkConfig", new Gson().toJson(tester.sinkConfig()), "--inputs", inputTopicName, "--parallelism", "2"} : new String[]{"/pulsar/bin/pulsar-admin", "sink", "create", "--tenant", tenant, "--namespace", namespace, "--name", sinkName, "--archive", tester.getSinkArchive(), "--classname", tester.getSinkClassName(), "--sinkConfig", new Gson().toJson(tester.sinkConfig()), "--inputs", inputTopicName, "--parallelism", "2"};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Updated successfully\""), (String)result.getStdout());
    }

    protected void getSinkInfoSuccess(SinkTester tester, String tenant, String namespace, String sinkName, boolean builtin) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sink", "get", "--tenant", tenant, "--namespace", namespace, "--name", sinkName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get sink info : {}", (Object)result.getStdout());
        if (builtin) {
            Assert.assertTrue((boolean)result.getStdout().contains("\"archive\": \"builtin://" + tester.getSinkType().getValue().toLowerCase() + "\""), (String)result.getStdout());
        } else {
            Assert.assertTrue((boolean)result.getStdout().contains("\"className\": \"" + tester.getSinkClassName() + "\""), (String)result.getStdout());
        }
    }

    protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sink", "status", "--tenant", tenant, "--namespace", namespace, "--name", sinkName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get sink status : {}", (Object)result.getStdout());
        Assert.assertEquals((int)result.getExitCode(), (int)0);
        SinkStatus sinkStatus = SinkStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)sinkStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)sinkStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)sinkStatus.getInstances().size(), (int)1);
        Assert.assertEquals((int)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((boolean)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertEquals((long)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().getNumRestarts(), (long)0L);
        Assert.assertEquals((int)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
    }

    protected void deleteSink(String tenant, String namespace, String sinkName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sink", "delete", "--tenant", tenant, "--namespace", namespace, "--name", sinkName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Deleted successfully"), (String)result.getStdout());
        result.assertNoStderr();
    }

    protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sink", "get", "--tenant", tenant, "--namespace", namespace, "--name", sinkName};
        try {
            this.pulsarCluster.getAnyWorker().execCmd(commands);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
        }
    }

    protected void waitForProcessingSinkMessages(String tenant, String namespace, String sinkName, int numMessages) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sink", "status", "--tenant", tenant, "--namespace", namespace, "--name", sinkName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get sink status : {}", (Object)result.getStdout());
        Assert.assertEquals((int)result.getExitCode(), (int)0);
        SinkStatus sinkStatus = SinkStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)sinkStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)sinkStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)sinkStatus.getInstances().size(), (int)1);
        Assert.assertEquals((int)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((boolean)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertTrue((((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().getLastReceivedTime() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().getNumReadFromPulsar(), (long)numMessages);
        Assert.assertEquals((long)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().getNumWrittenToSink(), (long)numMessages);
        Assert.assertEquals((long)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().getNumRestarts(), (long)0L);
        Assert.assertEquals((int)((SinkStatus.SinkInstanceStatus)sinkStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Map<String, String> produceSchemaInsertMessagesToInputTopic(String inputTopicName, int numMessages, Schema<JdbcPostgresSinkTester.Foo> schema) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            LinkedHashMap<String, String> linkedHashMap;
            block9: {
                Producer producer = client.newProducer(schema).topic(inputTopicName).create();
                try {
                    LinkedHashMap<String, String> kvs = new LinkedHashMap<String, String>();
                    for (int i = 0; i < numMessages; ++i) {
                        String key = "key-" + i;
                        JdbcPostgresSinkTester.Foo obj = new JdbcPostgresSinkTester.Foo();
                        obj.setField1("field1_insert_" + i);
                        obj.setField2("field2_insert_" + i);
                        obj.setField3(i);
                        String value = new String(schema.encode((Object)obj));
                        Map properties = Maps.newHashMap();
                        properties.put("ACTION", "INSERT");
                        kvs.put(key, value);
                        kvs.put("ACTION", "INSERT");
                        producer.newMessage().properties(properties).key(key).value((Object)obj).send();
                    }
                    linkedHashMap = kvs;
                    if (Collections.singletonList(producer).get(0) == null) break block9;
                }
                catch (Throwable throwable) {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                    throw throwable;
                }
                producer.close();
            }
            return linkedHashMap;
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Map<String, String> produceSchemaUpdateMessagesToInputTopic(String inputTopicName, int numMessages, Schema<JdbcPostgresSinkTester.Foo> schema) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            LinkedHashMap<String, String> linkedHashMap;
            block9: {
                Producer producer = client.newProducer(schema).topic(inputTopicName).create();
                try {
                    LinkedHashMap<String, String> kvs = new LinkedHashMap<String, String>();
                    log.info("update start");
                    for (int i = 0; i < numMessages; ++i) {
                        String key = "key-" + i;
                        JdbcPostgresSinkTester.Foo obj = new JdbcPostgresSinkTester.Foo();
                        obj.setField1("field1_insert_" + i);
                        obj.setField2("field2_update_" + i);
                        obj.setField3(i);
                        String value = new String(schema.encode((Object)obj));
                        Map properties = Maps.newHashMap();
                        properties.put("ACTION", "UPDATE");
                        kvs.put(key, value);
                        kvs.put("ACTION", "UPDATE");
                        producer.newMessage().properties(properties).key(key).value((Object)obj).send();
                    }
                    log.info("update end");
                    linkedHashMap = kvs;
                    if (Collections.singletonList(producer).get(0) == null) break block9;
                }
                catch (Throwable throwable) {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                    throw throwable;
                }
                producer.close();
            }
            return linkedHashMap;
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Map<String, String> produceSchemaDeleteMessagesToInputTopic(String inputTopicName, int numMessages, Schema<JdbcPostgresSinkTester.Foo> schema) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            LinkedHashMap<String, String> linkedHashMap;
            block9: {
                Producer producer = client.newProducer(schema).topic(inputTopicName).create();
                try {
                    LinkedHashMap<String, String> kvs = new LinkedHashMap<String, String>();
                    for (int i = 0; i < numMessages; ++i) {
                        String key = "key-" + i;
                        JdbcPostgresSinkTester.Foo obj = new JdbcPostgresSinkTester.Foo();
                        obj.setField1("field1_insert_" + i);
                        obj.setField2("field2_update_" + i);
                        obj.setField3(i);
                        String value = new String(schema.encode((Object)obj));
                        Map properties = Maps.newHashMap();
                        properties.put("ACTION", "DELETE");
                        kvs.put(key, value);
                        kvs.put("ACTION", "DELETE");
                        producer.newMessage().properties(properties).key(key).value((Object)obj).send();
                    }
                    linkedHashMap = kvs;
                    if (Collections.singletonList(producer).get(0) == null) break block9;
                }
                catch (Throwable throwable) {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                    throw throwable;
                }
                producer.close();
            }
            return linkedHashMap;
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }
}

