package cascading.pipe.assembly;

import cascading.PlatformTestCase;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.AssertionLevel;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.Identity;
import cascading.operation.assertion.AssertExpression;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.AverageBy;
import cascading.pipe.assembly.CountBy;
import cascading.pipe.assembly.CountByLocally;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Hasher;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.junit.Test;

/* loaded from: input_file:cascading/pipe/assembly/AssemblyHelpersPlatformTest.class */
public class AssemblyHelpersPlatformTest extends PlatformTestCase {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: cascading.pipe.assembly.AssemblyHelpersPlatformTest$1CustomHasher, reason: invalid class name */
    /* loaded from: input_file:cascading/pipe/assembly/AssemblyHelpersPlatformTest$1CustomHasher.class */
    public class C1CustomHasher implements Hasher<Object>, Comparator<Comparable>, Serializable {
        C1CustomHasher() {
        }

        public int hashCode(Object obj) {
            if (obj == null) {
                return 0;
            }
            long j = 2166136261L;
            for (int i = 0; i < obj.toString().getBytes().length; i++) {
                j = (j ^ r0[i]) * 16777619;
            }
            return (int) j;
        }

        @Override // java.util.Comparator
        public int compare(Comparable comparable, Comparable comparable2) {
            return comparable.compareTo(comparable2);
        }
    }

    /* loaded from: input_file:cascading/pipe/assembly/AssemblyHelpersPlatformTest$NullInsert.class */
    class NullInsert extends BaseOperation implements Function {
        private final int number;

        public NullInsert(int i, Fields fields) {
            super(2, fields);
            this.number = i;
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            int integer = arguments.getInteger(0);
            String string = arguments.getString(1);
            functionCall.getOutputCollector().add(integer == this.number ? new Tuple(new Object[]{null, string}) : new Tuple(new Object[]{Integer.valueOf(integer), string}));
        }
    }

    @Test
    public void testCoerce() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"num", "char"}), getOutputPath("coerce"), SinkMode.REPLACE), new Coerce(new Each(new Pipe("coerce"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"}), new Class[]{Integer.class}));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testCoerceFields() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"num", "char"}), getOutputPath("coercefields"), SinkMode.REPLACE), new Each(new Coerce(new Each(new Coerce(new Each(new Pipe("coerce"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{String.class, String.class}), " ")), new Fields(new Comparable[]{"num"}).applyTypes(new Type[]{Integer.class})), new Fields(new Comparable[]{"num"}), AssertionLevel.STRICT, new AssertExpression("num instanceof Integer", Object.class)), new Fields(new Comparable[]{"num"}).applyTypes(new Type[]{String.class})), new Fields(new Comparable[]{"num"}), AssertionLevel.STRICT, new AssertExpression("num instanceof String", Object.class)));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testRetainNarrow() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), getOutputPath("retainnarrow"), SinkMode.REPLACE), new Retain(new Each(new Pipe("shape"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+$"));
    }

    @Test
    public void testCopyNamed() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"num", "char", "item", "element"}), getOutputPath("copy"), SinkMode.REPLACE), new Copy(new Each(new Pipe("shape"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"item", "element"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+\\s\\w+\\s\\d+\\s\\w+$"));
    }

    @Test
    public void testDiscardNarrow() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), getOutputPath("discardnarrow"), SinkMode.REPLACE), new Discard(new Each(new Pipe("shape"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"char"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+$"));
    }

    @Test
    public void testRenameNamed() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"item", "element"}), getOutputPath("rename"), SinkMode.REPLACE), new Rename(new Each(new Pipe("shape"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"item", "element"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testRenameAll() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), new Fields(new Comparable[]{"item", "element"}), getOutputPath("renameall"), SinkMode.REPLACE), new Rename(new Each(new Pipe("shape"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), Fields.ALL, new Fields(new Comparable[]{"item", "element"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testRenameNarrow() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(new Fields(new Comparable[]{"item"}), new Fields(new Comparable[]{"char", "item"}), getOutputPath("renamenarrow"), SinkMode.REPLACE), new Rename(new Each(new Pipe("shape"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"item"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\w+\\s\\d+$"));
    }

    @Test
    public void testUnique() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileLhs), getPlatform().getTextFile(new Fields(new Comparable[]{"item"}), new Fields(new Comparable[]{"num", "char"}), getOutputPath("unique"), SinkMode.REPLACE), new Unique(new Each(new Pipe("shape"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testUniqueMerge() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLhs);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileRhs);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"item"}), new Fields(new Comparable[]{"num", "char"}), getOutputPath("uniquemerge-nondeterministic"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe each = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Pipe each2 = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap(Pipe.pipes(new Pipe[]{each, each2}), Tap.taps(new Tap[]{textFile, textFile2})), textFile3, new Unique(Pipe.pipes(new Pipe[]{each, each2}), new Fields(new Comparable[]{"num"})));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\d+\\s\\w+$"));
    }

    @Test
    public void testCount() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("count"), SinkMode.REPLACE), new CountBy(new Pipe("count"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 2}), new Tuple(new Object[]{"b", 4}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 1})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testCountAll() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"count"}), "\t", new Class[]{Integer.TYPE}, getOutputPath("countall"), SinkMode.REPLACE), new AggregateBy(new Pipe("count"), Fields.NONE, 2, new AggregateBy[]{new CountBy(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}))}));
        connect.complete();
        validateLength(connect, 1, 1, Pattern.compile("^\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{13})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testCountNullNotNull() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"notnull", "null"}), "\t", new Class[]{Integer.TYPE, Integer.TYPE}, getOutputPath("countnullnotnull"), SinkMode.REPLACE), new AggregateBy(new Each(new Pipe("count"), new Fields(new Comparable[]{"char"}), new ExpressionFunction(Fields.ARGS, "\"c\".equals($0) ? null : $0", String.class), Fields.REPLACE), Fields.NONE, 2, new AggregateBy[]{new CountBy(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"notnull"}), CountBy.Include.NO_NULLS), new CountBy(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"null"}), CountBy.Include.ONLY_NULLS)}));
        connect.complete();
        validateLength(connect, 1, 2, Pattern.compile("^\\d+\t\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{9, 4})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testCountMerge() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("mergecount"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("count-lhs");
        Pipe each = new Each(new Pipe("count-rhs"), new Fields(new Comparable[]{"char"}), new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap(Pipe.pipes(new Pipe[]{pipe, each}), Tap.taps(new Tap[]{delimitedFile, delimitedFile2})), delimitedFile3, new CountBy(Pipe.pipes(new Pipe[]{pipe, each}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 4}), new Tuple(new Object[]{"b", 8}), new Tuple(new Object[]{"c", 8}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 2})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testSumBy() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("sum"), SinkMode.REPLACE), new SumBy(new Pipe("sum"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Long.TYPE, 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 6}), new Tuple(new Object[]{"b", 12}), new Tuple(new Object[]{"c", 10}), new Tuple(new Object[]{"d", 6}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testSumByNulls() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.class}, getOutputPath("sumnulls"), SinkMode.REPLACE), new SumBy(new Each(new Pipe("sum"), new Fields(new Comparable[]{"num"}), new ExpressionFunction(Fields.ARGS, "5 == $0 ? null : $0", Integer.class), Fields.REPLACE), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Integer.class, 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s(\\d+|null)$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 7}), new Tuple(new Object[]{"c", 10}), new Tuple(new Object[]{"d", 6}), new Tuple(new Object[]{"e", null})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testSumMerge() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("mergesum"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("sum-lhs");
        Pipe each = new Each(new Pipe("sum-rhs"), new Fields(new Comparable[]{"char"}), new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap(Pipe.pipes(new Pipe[]{pipe, each}), Tap.taps(new Tap[]{delimitedFile, delimitedFile2})), delimitedFile3, new SumBy(Pipe.pipes(new Pipe[]{pipe, each}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Long.TYPE, 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 12}), new Tuple(new Object[]{"b", 24}), new Tuple(new Object[]{"c", 20}), new Tuple(new Object[]{"d", 12}), new Tuple(new Object[]{"e", 10})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testAverageBy() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, getOutputPath("average"), SinkMode.REPLACE), new AverageBy(new Pipe("average"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s[\\d.]+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", Double.valueOf(3.0d)}), new Tuple(new Object[]{"b", Double.valueOf(3.0d)}), new Tuple(new Object[]{"c", Double.valueOf(2.5d)}), new Tuple(new Object[]{"d", Double.valueOf(3.0d)}), new Tuple(new Object[]{"e", Double.valueOf(5.0d)})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testAverageByNull() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, getOutputPath("averagenull"), SinkMode.REPLACE), new AverageBy(new Each(new Pipe("average"), new Fields(new Comparable[]{"num"}), new ExpressionFunction(Fields.ARGS, "3 == $0 ? null : $0", Integer.class), Fields.REPLACE), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), AverageBy.Include.NO_NULLS, 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s[\\d.]+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", Double.valueOf(3.0d)}), new Tuple(new Object[]{"b", Double.valueOf(3.0d)}), new Tuple(new Object[]{"c", Double.valueOf(2.3333333333333335d)}), new Tuple(new Object[]{"d", Double.valueOf(3.0d)}), new Tuple(new Object[]{"e", Double.valueOf(5.0d)})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testAverageMerge() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, getOutputPath("mergeaverage"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("average-lhs");
        Pipe each = new Each(new Pipe("average-rhs"), new Fields(new Comparable[]{"char"}), new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap(Pipe.pipes(new Pipe[]{pipe, each}), Tap.taps(new Tap[]{delimitedFile, delimitedFile2})), delimitedFile3, new AverageBy(Pipe.pipes(new Pipe[]{pipe, each}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s[\\d.]+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", Double.valueOf(3.0d)}), new Tuple(new Object[]{"b", Double.valueOf(3.0d)}), new Tuple(new Object[]{"c", Double.valueOf(2.5d)}), new Tuple(new Object[]{"d", Double.valueOf(3.0d)}), new Tuple(new Object[]{"e", Double.valueOf(5.0d)})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testFirstBy() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileCross);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), " ", InputData.inputFileCross);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), "\t", new Class[]{Integer.TYPE, String.class, String.class}, getOutputPath("firstnfields"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("first");
        Fields fields = new Fields(new Comparable[]{"lower", "upper"});
        fields.setComparator("lower", Collections.reverseOrder());
        Flow connect = getPlatform().getFlowConnector().connect(delimitedFile, delimitedFile2, new FirstBy(pipe, new Fields(new Comparable[]{"num"}), fields, 2));
        connect.complete();
        Tuple[] tupleArr = {new Tuple(new Object[]{1, "c", "A"}), new Tuple(new Object[]{2, "d", "B"}), new Tuple(new Object[]{3, "c", "C"}), new Tuple(new Object[]{4, "d", "B"}), new Tuple(new Object[]{5, "e", "A"})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        assertTrue(!openSink.hasNext());
        openSink.close();
    }

    @Test
    public void testFirstByWithoutComparator() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileCrossRev);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), " ", InputData.inputFileCrossRev), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "lower", "upper"}), "\t", new Class[]{Integer.TYPE, String.class, String.class}, getOutputPath("firstnfieldswithoutcomparator"), SinkMode.REPLACE), new FirstBy(new Pipe("first"), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"lower", "upper"}), 2));
        connect.complete();
        Tuple[] tupleArr = {new Tuple(new Object[]{1, "c", "C"}), new Tuple(new Object[]{2, "d", "D"}), new Tuple(new Object[]{3, "c", "C"}), new Tuple(new Object[]{4, "d", "D"}), new Tuple(new Object[]{5, "e", "E"})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        assertTrue(!openSink.hasNext());
        openSink.close();
    }

    @Test
    public void testParallelAggregates() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum", "count", "average", "average2", "first"}), "\t", new Class[]{String.class, Integer.TYPE, Integer.TYPE, Double.TYPE, Double.TYPE, Integer.TYPE}, getOutputPath("multi"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("multi");
        Fields fields = new Fields(new Comparable[]{"num"});
        fields.setComparator("num", Collections.reverseOrder());
        Flow connect = getPlatform().getFlowConnector().connect(delimitedFile, delimitedFile2, new AggregateBy("name", Pipe.pipes(new Pipe[]{pipe}), new Fields(new Comparable[]{"char"}), 2, new AggregateBy[]{new SumBy(fields, new Fields(new Comparable[]{"sum"}), Long.TYPE), new CountBy(new Fields(new Comparable[]{"count"})), new AverageBy(fields, new Fields(new Comparable[]{"average"})), new AverageBy(fields, new Fields(new Comparable[]{"average2"})), new FirstBy(fields, new Fields(new Comparable[]{"first"}))}));
        connect.complete();
        validateLength(connect, 5, 6, Pattern.compile("^\\w+\\s\\d+\\s\\d+\\s[\\d.]+\\s[\\d.]+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 6, 2, Double.valueOf(3.0d), Double.valueOf(3.0d), 5}), new Tuple(new Object[]{"b", 12, 4, Double.valueOf(3.0d), Double.valueOf(3.0d), 5}), new Tuple(new Object[]{"c", 10, 4, Double.valueOf(2.5d), Double.valueOf(2.5d), 4}), new Tuple(new Object[]{"d", 6, 2, Double.valueOf(3.0d), Double.valueOf(3.0d), 4}), new Tuple(new Object[]{"e", 5, 1, Double.valueOf(5.0d), Double.valueOf(5.0d), 5})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testParallelAggregatesMergeLegacyHash() throws IOException {
        Map<Object, Object> properties = getProperties();
        properties.put("cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash", "true");
        performParallelAggregatesMerge(false, properties);
    }

    @Test
    public void testParallelAggregatesPriorMergeLegacyHash() throws IOException {
        Map<Object, Object> properties = getProperties();
        properties.put("cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash", "true");
        performParallelAggregatesMerge(true, properties);
    }

    @Test
    public void testParallelAggregatesMerge() throws IOException {
        performParallelAggregatesMerge(false, getProperties());
    }

    @Test
    public void testParallelAggregatesPriorMerge() throws IOException {
        performParallelAggregatesMerge(true, getProperties());
    }

    private void performParallelAggregatesMerge(boolean z, Map<Object, Object> map) throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileRhs);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum", "count", "average"}), "\t", new Class[]{String.class, Integer.TYPE, Integer.TYPE, Double.TYPE}, getOutputPath("multimerge+" + z + String.valueOf(map.size())), SinkMode.REPLACE);
        Pipe pipe = new Pipe("multi-lhs");
        Pipe each = new Each(new Pipe("multi-rhs"), new Fields(new Comparable[]{"char"}), new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE);
        Fields fields = new Fields(new Comparable[]{"sum"});
        fields.setComparator("sum", new C1CustomHasher());
        AggregateBy sumBy = new SumBy(new Fields(new Comparable[]{"num"}), fields, Long.TYPE);
        Fields fields2 = new Fields(new Comparable[]{"count"});
        fields2.setComparator("count", new C1CustomHasher());
        AggregateBy countBy = new CountBy(fields2);
        Fields fields3 = new Fields(new Comparable[]{"average"});
        fields3.setComparator("average", new C1CustomHasher());
        AggregateBy averageBy = new AverageBy(new Fields(new Comparable[]{"num"}), fields3);
        Fields fields4 = new Fields(new Comparable[]{"char"});
        fields4.setComparator("char", new C1CustomHasher());
        Flow connect = getPlatform().getFlowConnector(map).connect(Cascades.tapsMap(Pipe.pipes(new Pipe[]{pipe, each}), Tap.taps(new Tap[]{delimitedFile, delimitedFile2})), delimitedFile3, z ? new AggregateBy("name", new Merge(Pipe.pipes(new Pipe[]{pipe, each})), fields4, 2, new AggregateBy[]{sumBy, countBy, averageBy}) : new AggregateBy("name", Pipe.pipes(new Pipe[]{pipe, each}), fields4, 2, new AggregateBy[]{sumBy, countBy, averageBy}));
        connect.complete();
        validateLength(connect, 5, 4, Pattern.compile("^\\w+\\s\\d+\\s\\d+\\s[\\d.]+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 12, 4, Double.valueOf(3.0d)}), new Tuple(new Object[]{"b", 24, 8, Double.valueOf(3.0d)}), new Tuple(new Object[]{"c", 20, 8, Double.valueOf(2.5d)}), new Tuple(new Object[]{"d", 12, 4, Double.valueOf(3.0d)}), new Tuple(new Object[]{"e", 10, 2, Double.valueOf(5.0d)})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testCountCount() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"count", "count2"}), "\t", new Class[]{Integer.TYPE, Integer.TYPE}, getOutputPath("countcount"), SinkMode.REPLACE), new CountBy(new CountBy(new Pipe("count"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2), new Fields(new Comparable[]{"count"}), new Fields(new Comparable[]{"count2"}), 2));
        connect.complete();
        validateLength(connect, 3, 2, Pattern.compile("^\\d+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{1, 1}), new Tuple(new Object[]{2, 2}), new Tuple(new Object[]{4, 2})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testSameSourceMerge() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("samesourcemergecount"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("source");
        CountBy countBy = new CountBy(new Merge("first", new Pipe[]{new Pipe("count-lhs", pipe), new Each(new Pipe("count-rhs", pipe), new Fields(new Comparable[]{"char"}), new ExpressionFunction(Fields.ARGS, "$0.toLowerCase()", String.class), Fields.REPLACE)}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap(pipe, delimitedFile), delimitedFile2, new CountBy(new Merge("second", new Pipe[]{new Each(new Pipe("lhs", countBy), new Identity()), new Each(new Pipe("rhs", countBy), new Identity())}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2));
        List flowSteps = connect.getFlowSteps();
        if (getPlatform().isMapReduce()) {
            assertEquals("not equal: steps.size()", 2, flowSteps.size());
        }
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 2}), new Tuple(new Object[]{"b", 2}), new Tuple(new Object[]{"c", 2}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 2})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testSameSourceMergeThreeWay() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("samesourcemergethreeway"), SinkMode.REPLACE);
        CountBy countBy = new CountBy(new Pipe("source"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2);
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap(countBy, delimitedFile), delimitedFile2, new CoGroup(new CountBy(new Pipe("count-lhs", countBy), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2), new Fields(new Comparable[]{"char"}), new CoGroup(countBy, new Fields(new Comparable[]{"char"}), new CountBy(new Pipe("count-rhs", countBy), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 2), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"char", "num", "char2", "count"})), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"char", "count", "char2", "num2", "char3", "count3"})));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 1}), new Tuple(new Object[]{"e", 1})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testMinBy() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "min"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("minby"), SinkMode.REPLACE), new MinBy(new Pipe("min"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"min"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testMinByNullSafety() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "min"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("minbynullsafety"), SinkMode.REPLACE), new MinBy(new Each(new Pipe("min"), new NullInsert(3, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"min"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testMinByString() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "min"}), "\t", new Class[]{Integer.TYPE, String.class}, getOutputPath("minbystring"), SinkMode.REPLACE), new MinBy(new Pipe("max"), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"min"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\d+\\s\\w+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{1, "a"}), new Tuple(new Object[]{2, "b"}), new Tuple(new Object[]{3, "c"}), new Tuple(new Object[]{4, "b"}), new Tuple(new Object[]{5, "a"})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testMaxBy() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "max"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("maxby"), SinkMode.REPLACE), new MaxBy(new Pipe("max"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"max"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 5}), new Tuple(new Object[]{"b", 5}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testMaxByNullSafety() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "max"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("maxbynullsafety"), SinkMode.REPLACE), new MaxBy(new Each(new Pipe("max"), new NullInsert(1, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"max"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{"a", 5}), new Tuple(new Object[]{"b", 5}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 5})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testMaxByString() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "max"}), "\t", new Class[]{Integer.TYPE, String.class}, getOutputPath("maxbystring"), SinkMode.REPLACE), new MaxBy(new Pipe("max"), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"max"}), 2));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\d+\\s\\w+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{1, "c"}), new Tuple(new Object[]{2, "d"}), new Tuple(new Object[]{3, "c"}), new Tuple(new Object[]{4, "d"}), new Tuple(new Object[]{5, "e"})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testSumByLocally() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "sum"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("sum"), SinkMode.REPLACE), new SumByLocally(new Pipe("sum"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"sum"}), Long.TYPE, 500));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        HashSet hashSet = new HashSet();
        Collections.addAll(hashSet, new Tuple(new Object[]{"a", 6}), new Tuple(new Object[]{"b", 12}), new Tuple(new Object[]{"c", 10}), new Tuple(new Object[]{"d", 6}), new Tuple(new Object[]{"e", 5}));
        TupleEntryIterator openSink = connect.openSink();
        while (openSink.hasNext()) {
            assertTrue(hashSet.contains(((TupleEntry) openSink.next()).getTuple()));
        }
        openSink.close();
    }

    @Test
    public void testCountByLocally() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "count"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("count"), SinkMode.REPLACE), new CountByLocally(new Pipe("count"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}), 1000));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        HashSet hashSet = new HashSet();
        Collections.addAll(hashSet, new Tuple(new Object[]{"a", 2}), new Tuple(new Object[]{"b", 4}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 1}));
        TupleEntryIterator openSink = connect.openSink();
        while (openSink.hasNext()) {
            assertTrue(hashSet.contains(((TupleEntry) openSink.next()).getTuple()));
        }
        openSink.close();
    }

    @Test
    public void testCountByLocallyAll() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"count"}), "\t", new Class[]{Integer.TYPE}, getOutputPath("countall"), SinkMode.REPLACE), new AggregateByLocally(new Pipe("count"), Fields.NONE, 1000, new AggregateByLocally[]{new CountByLocally(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"count"}))}));
        connect.complete();
        validateLength(connect, 1, 1, Pattern.compile("^\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{13})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testCountByLocallyNullNotNull() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"notnull", "null"}), "\t", new Class[]{Integer.TYPE, Integer.TYPE}, getOutputPath("countnullnotnull"), SinkMode.REPLACE), new AggregateByLocally(new Each(new Pipe("count"), new Fields(new Comparable[]{"char"}), new ExpressionFunction(Fields.ARGS, "\"c\".equals($0) ? null : $0", String.class), Fields.REPLACE), Fields.NONE, 1000, new AggregateByLocally[]{new CountByLocally(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"notnull"}), CountByLocally.Include.NO_NULLS), new CountByLocally(new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"null"}), CountByLocally.Include.ONLY_NULLS)}));
        connect.complete();
        validateLength(connect, 1, 2, Pattern.compile("^\\d+\t\\d+$"));
        Tuple[] tupleArr = {new Tuple(new Object[]{9, 4})};
        TupleEntryIterator openSink = connect.openSink();
        int i = 0;
        while (openSink.hasNext()) {
            int i2 = i;
            i++;
            assertEquals(tupleArr[i2], ((TupleEntry) openSink.next()).getTuple());
        }
        openSink.close();
    }

    @Test
    public void testMinByLocallyNullSafety() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "min"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("minbynullsafety"), SinkMode.REPLACE), new MinByLocally(new Each(new Pipe("min"), new NullInsert(3, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"min"}), 1000));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        HashSet hashSet = new HashSet();
        Collections.addAll(hashSet, new Tuple(new Object[]{"a", 1}), new Tuple(new Object[]{"b", 1}), new Tuple(new Object[]{"c", 1}), new Tuple(new Object[]{"d", 2}), new Tuple(new Object[]{"e", 5}));
        TupleEntryIterator openSink = connect.openSink();
        while (openSink.hasNext()) {
            assertTrue(hashSet.remove(((TupleEntry) openSink.next()).getTuple()));
        }
        openSink.close();
    }

    @Test
    public void testMaxByLocallyNullSafety() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "max"}), "\t", new Class[]{String.class, Integer.TYPE}, getOutputPath("maxbylocallynullsafety"), SinkMode.REPLACE), new MaxByLocally(new Each(new Pipe("max"), new NullInsert(1, new Fields(new Comparable[]{"num", "char"})), Fields.RESULTS), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"max"}), 1000));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s\\d+$"));
        HashSet hashSet = new HashSet();
        Collections.addAll(hashSet, new Tuple(new Object[]{"a", 5}), new Tuple(new Object[]{"b", 5}), new Tuple(new Object[]{"c", 4}), new Tuple(new Object[]{"d", 4}), new Tuple(new Object[]{"e", 5}));
        TupleEntryIterator openSink = connect.openSink();
        while (openSink.hasNext()) {
            assertTrue(hashSet.remove(((TupleEntry) openSink.next()).getTuple()));
        }
        openSink.close();
    }

    @Test
    public void testAverageByLocally() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs), getPlatform().getDelimitedFile(new Fields(new Comparable[]{"char", "average"}), "\t", new Class[]{String.class, Double.TYPE}, getOutputPath("average"), SinkMode.REPLACE), new AverageByLocally(new Pipe("average"), new Fields(new Comparable[]{"char"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"average"}), 1000));
        connect.complete();
        validateLength(connect, 5, 2, Pattern.compile("^\\w+\\s[\\d.]+$"));
        HashSet hashSet = new HashSet();
        Collections.addAll(hashSet, new Tuple(new Object[]{"a", Double.valueOf(3.0d)}), new Tuple(new Object[]{"b", Double.valueOf(3.0d)}), new Tuple(new Object[]{"c", Double.valueOf(2.5d)}), new Tuple(new Object[]{"d", Double.valueOf(3.0d)}), new Tuple(new Object[]{"e", Double.valueOf(5.0d)}));
        TupleEntryIterator openSink = connect.openSink();
        while (openSink.hasNext()) {
            assertTrue(hashSet.remove(((TupleEntry) openSink.next()).getTuple()));
        }
        openSink.close();
        assertTrue(hashSet.isEmpty());
    }
}
