/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.kafka.flow;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.metrics.RuntimeMetrics;
import co.cask.cdap.kafka.flow.KafkaConsumingApp;
import co.cask.cdap.proto.Id;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.DataSetManager;
import co.cask.cdap.test.FlowManager;
import co.cask.cdap.test.TestBase;
import co.cask.cdap.test.TestConfiguration;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaConsumerFlowletTestBase
extends TestBase {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerFlowletTestBase.class);
    private static final int PARTITIONS = 6;
    @ClassRule
    public static final TestConfiguration CONFIG = new TestConfiguration(new Object[]{"data.tx.timeout", "2"});
    static InMemoryZKServer zkServer;
    static int kafkaPort;

    @BeforeClass
    public static void init() throws Exception {
        zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
        zkServer.startAndWait();
    }

    @AfterClass
    public static void cleanup() {
        zkServer.stopAndWait();
    }

    @After
    public void cleanUpMetrics() throws Exception {
        this.getMetricsManager().resetAll();
        KafkaConsumerFlowletTestBase.clear();
    }

    protected abstract Class<? extends KafkaConsumingApp> getApplication();

    protected abstract void sendMessage(String var1, Map<String, String> var2);

    protected abstract boolean supportBrokerList();

    private Map<String, String> getRuntimeArgs(String topic, int partitions, boolean preferZK) {
        HashMap args = Maps.newHashMap();
        args.put("kafka.topic", topic);
        if (!this.supportBrokerList() || preferZK) {
            args.put("kafka.zookeeper", zkServer.getConnectionStr());
        } else {
            args.put("kafka.brokers", "localhost:" + kafkaPort);
        }
        args.put("kafka.partitions", Integer.toString(partitions));
        return args;
    }

    protected Map<String, String> getRuntimeArgs(String topic, int partitions, boolean preferZK, long startOffset) {
        Map<String, String> args = this.getRuntimeArgs(topic, partitions, preferZK);
        args.put("kafka.default.offset", Long.toString(startOffset));
        return args;
    }

    @Test
    public final void testFlowlet() throws Exception {
        String topic = "testTopic";
        ApplicationManager appManager = KafkaConsumerFlowletTestBase.deployApplication(this.getApplication(), (File[])new File[0]);
        FlowManager flowManager = (FlowManager)appManager.getFlowManager("KafkaConsumingFlow").start(this.getRuntimeArgs(topic, 6, false));
        int msgCount = 5;
        HashMap messages = Maps.newHashMap();
        for (int i = 0; i < msgCount; ++i) {
            messages.put(Integer.toString(i), "Message " + i);
        }
        this.sendMessage(topic, messages);
        RuntimeMetrics sinkMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        sinkMetrics.waitForProcessed((long)msgCount, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals((long)msgCount, (long)sinkMetrics.getProcessed());
        flowManager.stop();
        messages.clear();
        messages.put(Integer.toString(msgCount), "Message " + msgCount++);
        messages.put("Failure", "Failure");
        messages.put(Integer.toString(msgCount), "Message " + msgCount++);
        this.sendMessage(topic, messages);
        this.getMetricsManager().resetAll();
        flowManager = this.startFlowWithRetry(appManager, "KafkaConsumingFlow", this.getRuntimeArgs(topic, 6, true), 5);
        sinkMetrics.waitForProcessed(2L, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals((long)2L, (long)sinkMetrics.getProcessed());
        flowManager.stop();
        this.assertDatasetCount(msgCount);
    }

    @Test
    public void testChangeInstances() throws Exception {
        String topic = "testChangeInstances";
        ApplicationManager appManager = KafkaConsumerFlowletTestBase.deployApplication(this.getApplication(), (File[])new File[0]);
        FlowManager flowManager = (FlowManager)appManager.getFlowManager("KafkaConsumingFlow").start(this.getRuntimeArgs(topic, 6, false));
        int msgCount = 100;
        for (int i = 0; i < msgCount; ++i) {
            this.sendMessage(topic, (Map<String, String>)ImmutableMap.of((Object)Integer.toString(i), (Object)("TestInstances " + i)));
        }
        RuntimeMetrics sinkMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        sinkMetrics.waitForProcessed((long)msgCount, 10L, TimeUnit.SECONDS);
        flowManager.setFlowletInstances("KafkaSource", 3);
        for (int i = 0; i < msgCount; ++i) {
            this.sendMessage(topic, (Map<String, String>)ImmutableMap.of((Object)Integer.toString(i + msgCount), (Object)("TestInstances " + (i + msgCount))));
        }
        sinkMetrics.waitForProcessed((long)(msgCount *= 2), 10L, TimeUnit.SECONDS);
        flowManager.stop();
        this.assertDatasetCount(msgCount);
    }

    @Test
    public final void testStartOffset() throws Exception {
        String topic = "testStartOffset";
        int msgCount = 5;
        HashMap messages = Maps.newHashMap();
        for (int i = 0; i < msgCount; ++i) {
            messages.put(Integer.toString(i), "Message " + i);
        }
        this.sendMessage(topic, messages);
        ApplicationManager appManager = KafkaConsumerFlowletTestBase.deployApplication(this.getApplication(), (File[])new File[0]);
        FlowManager flowManager = (FlowManager)appManager.getFlowManager("KafkaConsumingFlow").start(this.getRuntimeArgs(topic, 6, false, -1L));
        TimeUnit.SECONDS.sleep(2L);
        messages = Maps.newHashMap();
        for (int i = msgCount; i < msgCount + 5; ++i) {
            messages.put(Integer.toString(i), "Message " + i);
        }
        this.sendMessage(topic, messages);
        RuntimeMetrics sinkMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        sinkMetrics.waitForProcessed(5L, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals((long)msgCount, (long)sinkMetrics.getProcessed());
        flowManager.stop();
        this.assertDatasetCount(msgCount);
    }

    @Test
    public final void testInvalidStartOffsetLarger() throws Exception {
        String topic = "testInvalidStartOffsetLarger";
        int msgCount = 5;
        HashMap messages = Maps.newHashMap();
        for (int i = 0; i < msgCount; ++i) {
            messages.put(Integer.toString(i), "Message " + i);
        }
        this.sendMessage(topic, messages);
        ApplicationManager appManager = KafkaConsumerFlowletTestBase.deployApplication(this.getApplication(), (File[])new File[0]);
        long invalidStartOffset = 12345678901234L;
        FlowManager flowManager = (FlowManager)appManager.getFlowManager("KafkaConsumingFlow").start(this.getRuntimeArgs(topic, 6, false, invalidStartOffset));
        TimeUnit.SECONDS.sleep(2L);
        messages = Maps.newHashMap();
        for (int i = msgCount; i < msgCount + 5; ++i) {
            messages.put(Integer.toString(i), "Message " + i);
        }
        this.sendMessage(topic, messages);
        RuntimeMetrics sinkMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        sinkMetrics.waitForProcessed(5L, 10L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals((long)msgCount, (long)sinkMetrics.getProcessed());
        flowManager.stop();
        this.assertDatasetCount(msgCount);
    }

    @Test
    public final void testInvalidStartOffsetSmaller() throws Exception {
        int i;
        String topic = "testInvalidStartOffsetSmaller";
        int msgCount = 500;
        HashMap messages = Maps.newHashMap();
        for (i = 0; i < msgCount; ++i) {
            messages.put(Integer.toString(i), "Test Invalid Start Offset Message " + i);
        }
        this.sendMessage(topic, messages);
        messages.clear();
        for (i = msgCount; i < 2 * msgCount; ++i) {
            messages.put(Integer.toString(i), "Test Invalid Start Offset Message " + i);
        }
        this.sendMessage(topic, messages);
        TimeUnit.SECONDS.sleep(80L);
        ApplicationManager appManager = KafkaConsumerFlowletTestBase.deployApplication(this.getApplication(), (File[])new File[0]);
        FlowManager flowManager = (FlowManager)appManager.getFlowManager("KafkaConsumingFlow").start(this.getRuntimeArgs(topic, 6, false, -2L));
        TimeUnit.SECONDS.sleep(2L);
        RuntimeMetrics sinkMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        sinkMetrics.waitForProcessed(10L, 30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(10L);
        flowManager.stop();
        long expectedCount = sinkMetrics.getProcessed();
        LOG.info("Fetched {} messages from Kafka", (Object)expectedCount);
        Assert.assertTrue((expectedCount > 1L ? 1 : 0) != 0);
        Assert.assertTrue((expectedCount < (long)(2 * msgCount) ? 1 : 0) != 0);
        KafkaConsumerFlowletTestBase.clear();
        appManager = KafkaConsumerFlowletTestBase.deployApplication(this.getApplication(), (File[])new File[0]);
        long invalidStartOffset = 0L;
        flowManager = (FlowManager)appManager.getFlowManager("KafkaConsumingFlow").start(this.getRuntimeArgs(topic, 6, false, invalidStartOffset));
        TimeUnit.SECONDS.sleep(2L);
        sinkMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
        sinkMetrics.waitForProcessed(expectedCount, 30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
        flowManager.stop();
        Assert.assertEquals((long)expectedCount, (long)sinkMetrics.getProcessed());
        this.assertDatasetCount(expectedCount);
    }

    @Test
    public void testProcessingLimits() throws Exception {
        String topic = "testProcessingLimits";
        ApplicationManager appManager = KafkaConsumerFlowletTestBase.deployApplication(this.getApplication(), (File[])new File[0]);
        FlowManager flowManager = appManager.getFlowManager("KafkaConsumingFlow");
        Map<String, String> runtimeArgs = this.getRuntimeArgs(topic, 6, false);
        runtimeArgs.put("batch.size", "10");
        this.runFlow(flowManager, runtimeArgs, topic);
        runtimeArgs = this.getRuntimeArgs(topic, 6, false);
        runtimeArgs.put("max.process.millis", "2000");
        this.runFlow(flowManager, runtimeArgs, topic);
        runtimeArgs = this.getRuntimeArgs(topic, 6, false);
        try {
            this.runFlow(flowManager, runtimeArgs, topic);
            Assert.fail();
        }
        catch (TimeoutException e) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runFlow(FlowManager flowManager, Map<String, String> args, String topic) throws Exception {
        int msgCount = 100;
        HashMap messages = Maps.newHashMap();
        for (int i = 0; i < msgCount; ++i) {
            messages.put(Integer.toString(i), "sleep:100");
        }
        this.sendMessage(topic, messages);
        flowManager.start(args);
        try {
            RuntimeMetrics sinkMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "DataSink");
            sinkMetrics.waitForProcessed((long)msgCount, 15L, TimeUnit.SECONDS);
            RuntimeMetrics sourceMetrics = this.getMetricsManager().getFlowletMetrics(Id.Namespace.DEFAULT.getId(), "KafkaConsumingApp", "KafkaConsumingFlow", "KafkaSource");
            Assert.assertEquals((long)0L, (long)sourceMetrics.getException());
        }
        finally {
            flowManager.stop();
            this.getMetricsManager().resetAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void assertDatasetCount(long expectedMsgCount) throws Exception {
        DataSetManager datasetManager = this.getDataset("counter");
        KeyValueTable counter = (KeyValueTable)datasetManager.get();
        try (CloseableIterator scanner = counter.scan(null, null);){
            int size = 0;
            while (scanner.hasNext()) {
                KeyValue keyValue = (KeyValue)scanner.next();
                Assert.assertEquals((long)1L, (long)Bytes.toLong((byte[])((byte[])keyValue.getValue())));
                ++size;
            }
            Assert.assertEquals((long)expectedMsgCount, (long)size);
        }
    }

    private FlowManager startFlowWithRetry(ApplicationManager appManager, String flowId, Map<String, String> args, int trials) {
        Throwable failure = null;
        while (true) {
            try {
                if (failure != null) {
                    TimeUnit.SECONDS.sleep(1L);
                }
                return (FlowManager)appManager.getFlowManager(flowId).start(args);
            }
            catch (InterruptedException e) {
                throw Throwables.propagate((Throwable)e);
            }
            catch (Throwable t) {
                failure = t;
                if (--trials > 0) continue;
                throw Throwables.propagate((Throwable)failure);
            }
            break;
        }
    }

    protected static Properties generateKafkaConfig(String zkConnectStr, int port, File logDir) {
        Properties prop = new Properties();
        prop.setProperty("log.dir", logDir.getAbsolutePath());
        prop.setProperty("port", Integer.toString(port));
        prop.setProperty("broker.id", "1");
        prop.setProperty("socket.send.buffer.bytes", "1048576");
        prop.setProperty("socket.receive.buffer.bytes", "1048576");
        prop.setProperty("socket.request.max.bytes", "104857600");
        prop.setProperty("num.partitions", Integer.toString(6));
        prop.setProperty("log.retention.hours", "24");
        prop.setProperty("log.flush.interval.messages", "10");
        prop.setProperty("log.flush.interval.ms", "1000");
        prop.setProperty("log.segment.bytes", "100");
        prop.setProperty("zookeeper.connect", zkConnectStr);
        prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
        prop.setProperty("default.replication.factor", "1");
        prop.setProperty("log.retention.bytes", "1000");
        prop.setProperty("log.retention.check.interval.ms", "60000");
        prop.setProperty("brokerid", "1");
        prop.setProperty("zk.connect", zkConnectStr);
        prop.setProperty("zk.connectiontimeout.ms", "1000000");
        prop.setProperty("log.retention.size", "1000");
        prop.setProperty("log.cleanup.interval.mins", "1");
        prop.setProperty("log.file.size", "1000");
        return prop;
    }
}

