package co.cask.cdap.examples.profiles;

import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.table.Get;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/examples/profiles/ActivityFlow.class */
public class ActivityFlow extends AbstractFlow {
    private static final Logger LOG = LoggerFactory.getLogger(ActivityFlow.class);

    /* loaded from: input_file:co/cask/cdap/examples/profiles/ActivityFlow$Counter.class */
    private class Counter extends AbstractFlowlet {

        @UseDataSet("counters")
        private KeyValueTable counters;

        private Counter() {
        }

        @ProcessInput
        public void process(Event event) {
            this.counters.increment(Bytes.toBytes(event.getUrl()), 1L);
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/profiles/ActivityFlow$EventReader.class */
    private class EventReader extends AbstractFlowlet {
        private OutputEmitter<Event> out;

        private EventReader() {
        }

        @ProcessInput
        public void process(StreamEvent streamEvent) {
            try {
                this.out.emit(Event.fromJson((ByteBuffer) streamEvent.getBody()));
            } catch (Exception e) {
                ActivityFlow.LOG.debug("Problem decoding event: {}", Bytes.toString((ByteBuffer) streamEvent.getBody()), e);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/profiles/ActivityFlow$Updater.class */
    private class Updater extends AbstractFlowlet {

        @UseDataSet("profiles")
        private Table profiles;

        private Updater() {
        }

        @ProcessInput
        public void process(Event event) {
            String userId = event.getUserId();
            if (this.profiles.get(new Get(userId, new String[]{"id"})).isEmpty()) {
                ActivityFlow.LOG.debug("Received event for unknown user id {}.", userId);
            } else {
                this.profiles.put(new Put(userId, "active", event.getTime()));
            }
        }
    }

    protected void configureFlow() {
        setName("ActivityFlow");
        setDescription("Reads click events from a stream, counts clicks per URL, and records user activity.");
        addFlowlet("reader", new EventReader());
        addFlowlet("counter", new Counter());
        addFlowlet("updater", new Updater());
        connectStream("events", "reader");
        connect("reader", "counter");
        connect("reader", "updater");
    }
}
