package co.cask.cdap.mapreduce;

import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.stream.GenericStreamEventData;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

/* loaded from: input_file:co/cask/cdap/mapreduce/AppWithMapReduceUsingStream.class */
public class AppWithMapReduceUsingStream extends AbstractApplication {
    static final Schema SCHEMA = Schema.recordOf("event", new Schema.Field[]{Schema.Field.of("ticker", Schema.of(Schema.Type.STRING)), Schema.Field.of("num_traded", Schema.of(Schema.Type.INT)), Schema.Field.of("price", Schema.of(Schema.Type.FLOAT))});

    /* loaded from: input_file:co/cask/cdap/mapreduce/AppWithMapReduceUsingStream$BodyTracker.class */
    public static final class BodyTracker extends AbstractMapReduce {
        public void initialize() throws Exception {
            MapReduceContext context = getContext();
            Job job = (Job) context.getHadoopJob();
            job.setMapperClass(TickerMapper.class);
            job.setReducerClass(PriceCounter.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FloatWritable.class);
            job.setOutputKeyClass(byte[].class);
            job.setOutputValueClass(byte[].class);
            context.addInput(Input.ofStream("mrStream", 0L, Long.MAX_VALUE, new FormatSpecification("avro", AppWithMapReduceUsingStream.SCHEMA, Collections.emptyMap())));
            context.addOutput(Output.ofDataset("prices"));
        }
    }

    /* loaded from: input_file:co/cask/cdap/mapreduce/AppWithMapReduceUsingStream$PriceCounter.class */
    public static class PriceCounter extends Reducer<Text, FloatWritable, byte[], byte[]> {
        public void reduce(Text text, Iterable<FloatWritable> iterable, Reducer<Text, FloatWritable, byte[], byte[]>.Context context) throws IOException, InterruptedException {
            Float valueOf = Float.valueOf(0.0f);
            Iterator<FloatWritable> it = iterable.iterator();
            while (it.hasNext()) {
                valueOf = Float.valueOf(valueOf.floatValue() + it.next().get());
            }
            context.write(text.getBytes(), Bytes.toBytes(valueOf.floatValue()));
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<FloatWritable>) iterable, (Reducer<Text, FloatWritable, byte[], byte[]>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/mapreduce/AppWithMapReduceUsingStream$TickerMapper.class */
    public static class TickerMapper extends Mapper<LongWritable, GenericStreamEventData<StructuredRecord>, Text, FloatWritable> {
        public void map(LongWritable longWritable, GenericStreamEventData<StructuredRecord> genericStreamEventData, Mapper<LongWritable, GenericStreamEventData<StructuredRecord>, Text, FloatWritable>.Context context) throws IOException, InterruptedException {
            StructuredRecord structuredRecord = (StructuredRecord) genericStreamEventData.getBody();
            context.write(new Text(structuredRecord.get("ticker").toString()), new FloatWritable(((Integer) structuredRecord.get("num_traded")).intValue() * ((Float) structuredRecord.get("price")).floatValue()));
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (GenericStreamEventData<StructuredRecord>) obj2, (Mapper<LongWritable, GenericStreamEventData<StructuredRecord>, Text, FloatWritable>.Context) context);
        }
    }

    public void configure() {
        setName("AppWithMapReduceUsingStream");
        setDescription("Application with MapReduce job using stream as input");
        addStream(new Stream("mrStream"));
        createDataset("prices", KeyValueTable.class);
        addMapReduce(new BodyTracker());
    }
}
