package co.cask.cdap;

import co.cask.cdap.api.annotation.Output;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.Property;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.Callback;
import co.cask.cdap.api.flow.flowlet.FailurePolicy;
import co.cask.cdap.api.flow.flowlet.FailureReason;
import co.cask.cdap.api.flow.flowlet.InputContext;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.service.AbstractService;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Longs;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Map;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/WordCountApp.class */
public class WordCountApp extends AbstractApplication {
    private static final Logger LOG = LoggerFactory.getLogger(WordCountApp.class);

    /* loaded from: input_file:co/cask/cdap/WordCountApp$CountByField.class */
    public static class CountByField extends AbstractFlowlet implements Callback {

        @Property
        private final String wordKey;

        @Property
        private final String fieldKey;

        @Property
        private final long increment = 1;

        public CountByField(String str, String str2) {
            this.wordKey = str;
            this.fieldKey = str2;
        }

        @ProcessInput({"field"})
        public void process(Map<String, String> map) {
            WordCountApp.LOG.info("process count by field: " + map);
            String str = map.get(this.wordKey);
            if (str == null) {
                return;
            }
            String str2 = map.get(this.fieldKey);
            if (str2 != null) {
                str = str2 + ":" + str;
            }
            KeyValueTable dataset = getContext().getDataset("mydataset");
            dataset.increment(str.getBytes(Charsets.UTF_8), 1L);
            WordCountApp.LOG.info(str + " " + Longs.fromByteArray(dataset.read(str.getBytes(Charsets.UTF_8))));
        }

        public void onSuccess(@Nullable Object obj, @Nullable InputContext inputContext) {
        }

        public FailurePolicy onFailure(@Nullable Object obj, @Nullable InputContext inputContext, FailureReason failureReason) {
            return FailurePolicy.RETRY;
        }
    }

    /* loaded from: input_file:co/cask/cdap/WordCountApp$MyRecord.class */
    public static final class MyRecord {
        private final String title;
        private final String text;
        private final boolean expired;

        public MyRecord(String str, String str2, boolean z) {
            this.title = str;
            this.text = str2;
            this.expired = z;
        }

        public String getTitle() {
            return this.title;
        }

        public String getText() {
            return this.text;
        }

        public boolean isExpired() {
            return this.expired;
        }
    }

    /* loaded from: input_file:co/cask/cdap/WordCountApp$StreamSucker.class */
    public static class StreamSucker extends AbstractFlowlet {
        private OutputEmitter<MyRecord> output;
        private Metrics metrics;

        @ProcessInput
        public void process(StreamEvent streamEvent, InputContext inputContext) throws CharacterCodingException {
            if ("text".equals(inputContext.getOrigin())) {
                this.metrics.count("stream.event", 1);
                ByteBuffer byteBuffer = (ByteBuffer) streamEvent.getBody();
                this.output.emit(new MyRecord((String) streamEvent.getHeaders().get("title"), byteBuffer == null ? null : Charsets.UTF_8.newDecoder().decode(byteBuffer).toString(), false));
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/WordCountApp$Tokenizer.class */
    public static class Tokenizer extends AbstractFlowlet {

        @Output("field")
        private OutputEmitter<Map<String, String>> outputMap;

        @ProcessInput
        public void foo(MyRecord myRecord) {
            tokenize(myRecord.getTitle(), "title");
            tokenize(myRecord.getText(), "text");
        }

        private void tokenize(String str, String str2) {
            if (str == null) {
                return;
            }
            for (String str3 : str.split("[ .-]")) {
                this.outputMap.emit(ImmutableMap.of("field", str2, "word", str3));
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/WordCountApp$VoidMapReduceJob.class */
    public static class VoidMapReduceJob extends AbstractMapReduce {
        protected void configure() {
            setDescription("Mapreduce that does nothing (and actually doesn't run) - it is here for testing MDS");
        }
    }

    /* loaded from: input_file:co/cask/cdap/WordCountApp$WordCountFlow.class */
    public static class WordCountFlow extends AbstractFlow {
        protected void configureFlow() {
            setName("WordCountFlow");
            setDescription("Flow for counting words");
            addFlowlet("StreamSource", new StreamSucker());
            addFlowlet(new Tokenizer());
            addFlowlet(new CountByField("word", "field"));
            connectStream("text", "StreamSource");
            connect("StreamSource", "Tokenizer");
            connect("Tokenizer", "CountByField");
        }
    }

    /* loaded from: input_file:co/cask/cdap/WordCountApp$WordFrequencyHandler.class */
    public static class WordFrequencyHandler extends AbstractHttpServiceHandler {

        @UseDataSet("mydataset")
        private KeyValueTable counters;

        @GET
        @Path("wordfreq/{word}")
        public void wordFrequency(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("word") String str) {
            byte[] read = this.counters.read(str);
            if (read == null) {
                httpServiceResponder.sendStatus(404);
            } else {
                httpServiceResponder.sendJson(ImmutableMap.of(str, Long.valueOf(Bytes.toLong(read))));
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/WordCountApp$WordFrequencyService.class */
    public static class WordFrequencyService extends AbstractService {
        protected void configure() {
            addHandler(new WordFrequencyHandler());
        }
    }

    public void configure() {
        setName("WordCountApp");
        setDescription("Application for counting words");
        addStream(new Stream("text"));
        createDataset("mydataset", KeyValueTable.class);
        addFlow(new WordCountFlow());
        addService(new WordFrequencyService());
        addMapReduce(new VoidMapReduceJob());
    }
}
