/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.kafka.flow;

import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.Flow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.Flowlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaConsumingApp
extends AbstractApplication {
    public void configure() {
        this.setName("KafkaConsumingApp");
        this.addFlow((Flow)new KafkaConsumingFlow(this.getKafkaSourceFlowlet()));
        this.createDataset("kafkaOffsets", KeyValueTable.class);
        this.createDataset("counter", KeyValueTable.class);
    }

    protected abstract Flowlet getKafkaSourceFlowlet();

    public static final class DataSink
    extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(DataSink.class);
        @UseDataSet(value="counter")
        private KeyValueTable counter;

        @ProcessInput
        public void process(String string) {
            LOG.info("Received: {}", (Object)string);
            this.counter.increment(Bytes.toBytes((String)string), 1L);
        }
    }

    public static final class KafkaConsumingFlow
    extends AbstractFlow {
        private final Flowlet sourceFlowlet;

        KafkaConsumingFlow(Flowlet sourceFlowlet) {
            this.sourceFlowlet = sourceFlowlet;
        }

        public void configure() {
            this.setName("KafkaConsumingFlow");
            this.addFlowlet(this.sourceFlowlet);
            this.addFlowlet((Flowlet)new DataSink());
            this.connect(this.sourceFlowlet, (Flowlet)new DataSink());
        }
    }
}

