package co.cask.cdap.kafka.flow;

import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.Flowlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaConsumingApp.class */
public abstract class KafkaConsumingApp extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaConsumingApp$DataSink.class */
    public static final class DataSink extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(DataSink.class);

        @UseDataSet("counter")
        private KeyValueTable counter;

        @ProcessInput
        public void process(String str) {
            LOG.info("Received: {}", str);
            this.counter.increment(Bytes.toBytes(str), 1L);
        }
    }

    /* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaConsumingApp$KafkaConsumingFlow.class */
    public static final class KafkaConsumingFlow extends AbstractFlow {
        private final Flowlet sourceFlowlet;

        KafkaConsumingFlow(Flowlet flowlet) {
            this.sourceFlowlet = flowlet;
        }

        public void configure() {
            setName("KafkaConsumingFlow");
            addFlowlet(this.sourceFlowlet);
            addFlowlet(new DataSink());
            connect(this.sourceFlowlet, new DataSink());
        }
    }

    public void configure() {
        setName("KafkaConsumingApp");
        addFlow(new KafkaConsumingFlow(getKafkaSourceFlowlet()));
        createDataset("kafkaOffsets", KeyValueTable.class);
        createDataset("counter", KeyValueTable.class);
    }

    protected abstract Flowlet getKafkaSourceFlowlet();
}
