package co.cask.cdap.mapreduce;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.test.ApplicationManager;
import co.cask.cdap.test.StreamManager;
import co.cask.cdap.test.base.TestFrameworkTestBase;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/mapreduce/MapReduceStreamInputTestRun.class */
public class MapReduceStreamInputTestRun extends TestFrameworkTestBase {
    @Test
    public void test() throws Exception {
        ApplicationManager deployApplication = deployApplication(AppWithMapReduceUsingStream.class, new File[0]);
        Schema parse = new Schema.Parser().parse(AppWithMapReduceUsingStream.SCHEMA.toString());
        StreamManager streamManager = getStreamManager("mrStream");
        streamManager.send(createEvent(parse, "YHOO", 100, 10.0f));
        streamManager.send(createEvent(parse, "YHOO", 10, 10.1f));
        streamManager.send(createEvent(parse, "YHOO", 13, 9.9f));
        streamManager.send(createEvent(parse, "AAPL", 5, 300.0f));
        streamManager.send(createEvent(parse, "AAPL", 3, 298.34f));
        streamManager.send(createEvent(parse, "AAPL", 50, 305.23f));
        streamManager.send(createEvent(parse, "AAPL", 1000, 284.13f));
        deployApplication.getMapReduceManager("BodyTracker").start().waitForFinish(180L, TimeUnit.SECONDS);
        KeyValueTable keyValueTable = (KeyValueTable) getDataset("prices").get();
        float f = Bytes.toFloat(keyValueTable.read(Bytes.toBytes("YHOO")));
        float f2 = Bytes.toFloat(keyValueTable.read(Bytes.toBytes("AAPL")));
        Assert.assertTrue(((double) Math.abs(1229.7f - f)) < 1.0E-7d);
        Assert.assertTrue(((double) Math.abs(301786.53f - f2)) < 1.0E-7d);
    }

    private byte[] createEvent(Schema schema, String str, int i, float f) throws IOException {
        GenericData.Record build = new GenericRecordBuilder(schema).set("ticker", str).set("num_traded", Integer.valueOf(i)).set("price", Float.valueOf(f)).build();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
        new GenericDatumWriter(schema).write(build, binaryEncoder);
        binaryEncoder.flush();
        byteArrayOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }
}
