package cascading;

import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.operation.Debug;
import cascading.operation.Filter;
import cascading.operation.Identity;
import cascading.operation.Insert;
import cascading.operation.NoOp;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.filter.And;
import cascading.operation.function.UnGroup;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.tap.MultiSourceTap;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Hasher;
import cascading.tuple.Tuple;
import data.InputData;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.junit.Test;

/* loaded from: input_file:cascading/FieldedPipesPlatformTest.class */
public class FieldedPipesPlatformTest extends PlatformTestCase {

    /* loaded from: input_file:cascading/FieldedPipesPlatformTest$LowerComparator.class */
    public static class LowerComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable {
        @Override // java.util.Comparator
        public int compare(Comparable comparable, Comparable comparable2) {
            return comparable.toString().toLowerCase().compareTo(comparable2.toString().toLowerCase());
        }

        public int hashCode(Comparable comparable) {
            if (comparable == null) {
                return 0;
            }
            return comparable.toString().toLowerCase().hashCode();
        }
    }

    public FieldedPipesPlatformTest() {
        super(true, 5, 3);
    }

    @Test
    public void testSimpleGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("simple"), SinkMode.REPLACE), new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})));
        connect.complete();
        validateLength(connect.openSource(), 10);
        validateLength(connect, 8, null);
    }

    @Test
    public void testSimpleChain() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTabDelimitedFile(Fields.ALL, getOutputPath("simplechain"), SinkMode.REPLACE), new Every(new Every(new Every(new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(new Fields(new Comparable[]{"count1"}))), new Count(new Fields(new Comparable[]{"count2"}))), new Count(new Fields(new Comparable[]{"count3"}))), new Count(new Fields(new Comparable[]{"count4"}))));
        connect.complete();
        validateLength(connect, 8, 5);
    }

    @Test
    public void testChainEndingWithEach() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("chaineach"), SinkMode.REPLACE), new Each(new Every(new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(new Fields(new Comparable[]{"count1"}))), new Count(new Fields(new Comparable[]{"count2"}))), new Fields(new Comparable[]{"count1", "count2"}), new ExpressionFunction(new Fields(new Comparable[]{"sum"}), "count1 + count2", Integer.TYPE), Fields.ALL));
        connect.complete();
        validateLength(connect, 8, null);
    }

    @Test
    public void testNoGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileApache), getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("nogroup"), SinkMode.REPLACE), new Each(new Pipe("test"), new RegexSplitter("\\s+"), new Fields(new Comparable[]{1})));
        connect.complete();
        validateLength(connect, 10, null);
        assertTrue(getSinkAsList(connect).contains(new Tuple(new Object[]{"75.185.76.245"})));
    }

    @Test
    public void testCopy() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("copy"), SinkMode.REPLACE), new Pipe("test"));
        connect.complete();
        validateLength(connect, 10, null);
    }

    @Test
    public void testSimpleMerge() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("simplemerge"), SinkMode.REPLACE), new GroupBy("merge", Pipe.pipes(new Pipe[]{new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num"}), (Fields) null, false));
        connect.complete();
        validateLength(connect, 10);
        List sinkAsList = getSinkAsList(connect);
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\ta"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\tA"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"2\tb"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"2\tB"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"3\tc"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"3\tC"})));
    }

    @Test
    public void testSimpleMergeThree() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap delimitedFile = getPlatform().getDelimitedFile(Fields.ALL, "\t", getOutputPath("simplemergethree"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, delimitedFile, new Each(new Every(new GroupBy("merge", Pipe.pipes(new Pipe[]{new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"char"})), new Fields(new Comparable[]{"char"}), new First(new Fields(new Comparable[]{"first"}))), new Fields(new Comparable[]{"num", "first"}), new Identity()));
        connect.complete();
        validateLength(connect, 6);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1", "A"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2", "B"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"3", "C"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"4", "D"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5", "E"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"6", "c"})));
    }

    @Test
    public void testSameSourceMerge() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", delimitedFile);
        hashMap.put("upper", delimitedFile);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath(), SinkMode.REPLACE), new GroupBy("merge", Pipe.pipes(new Pipe[]{new Pipe("lower"), new Pipe("upper")}), new Fields(new Comparable[]{"num"}), (Fields) null, false));
        connect.complete();
        validateLength(connect, 10);
        List sinkAsList = getSinkAsList(connect);
        assertEquals("missing value", 2, Collections.frequency(sinkAsList, new Tuple(new Object[]{"1\ta"})));
        assertEquals("missing value", 2, Collections.frequency(sinkAsList, new Tuple(new Object[]{"2\tb"})));
        assertEquals("missing value", 2, Collections.frequency(sinkAsList, new Tuple(new Object[]{"3\tc"})));
    }

    @Test
    public void testSameSourceMergeThreeChainGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("split", textFile);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("samemergethreechaingroup"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipe = new Pipe("split");
        Pipe each = new Each(new Pipe("lower", pipe), new Fields(new Comparable[]{"line"}), regexSplitter);
        Pipe each2 = new Each(new Pipe("upper", pipe), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile2, new GroupBy(Pipe.pipes(new Pipe[]{new GroupBy(Pipe.pipes(new Pipe[]{each, each2}), new Fields(new Comparable[]{"num"})), new Each(new Pipe("offset", pipe), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 15);
    }

    @Test
    public void testUnGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileJoined);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileJoined), getPlatform().getTextFile(getOutputPath("ungrouped"), SinkMode.REPLACE), new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "lower", "upper"}))), new UnGroup(new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"num"}), Fields.fields(new Fields[]{new Fields(new Comparable[]{"lower"}), new Fields(new Comparable[]{"upper"})}))));
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testUnGroupAnon() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileJoined);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileJoined), getPlatform().getTextFile(getOutputPath("ungroupedanon"), SinkMode.REPLACE), new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "lower", "upper"}))), new UnGroup(new Fields(new Comparable[]{"num"}), Fields.fields(new Fields[]{new Fields(new Comparable[]{"lower"}), new Fields(new Comparable[]{"upper"})}))));
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testUnGroupBySize() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileJoinedExtra);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileJoinedExtra);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("ungrouped_size"), SinkMode.REPLACE);
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "num2", "lower", "upper"}))), new UnGroup(new Fields(new Comparable[]{"num1", "num2", "char"}), new Fields(new Comparable[]{"num1", "num2"}), 1)));
        connect.complete();
        List asList = asList(connect, textFile2);
        assertEquals(10, asList.size());
        ArrayList arrayList = new ArrayList();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            arrayList.add(((Tuple) it.next()).getObject(1));
        }
        assertTrue(arrayList.contains("1\t1\ta"));
        assertTrue(arrayList.contains("1\t1\tA"));
        assertTrue(arrayList.contains("2\t2\tb"));
        assertTrue(arrayList.contains("2\t2\tB"));
        assertTrue(arrayList.contains("3\t3\tc"));
        assertTrue(arrayList.contains("3\t3\tC"));
        assertTrue(arrayList.contains("4\t4\td"));
        assertTrue(arrayList.contains("4\t4\tD"));
        assertTrue(arrayList.contains("5\t5\te"));
        assertTrue(arrayList.contains("5\t5\tE"));
    }

    @Test
    public void testFilter() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("filter"), SinkMode.REPLACE), new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*")));
        connect.complete();
        validateLength(connect, 3);
    }

    @Test
    public void testLogicFilter() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("logicfilter"), SinkMode.REPLACE), new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new And(new Filter[]{new RegexFilter("^68.*$"), new RegexFilter("^1000.*$")})));
        connect.complete();
        validateLength(connect, 3);
    }

    @Test
    public void testFilterComplex() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("filtercomplex"), SinkMode.REPLACE), new Every(new GroupBy(new Each(new Each(new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), TestConstants.APACHE_COMMON_PARSER), new Fields(new Comparable[]{"method"}), new RegexFilter("^POST")), new Fields(new Comparable[]{"method"}), new RegexFilter("^POST")), new Fields(new Comparable[]{"method"}), new Identity(new Fields(new Comparable[]{"value"})), Fields.ALL), new Fields(new Comparable[]{"value"})), new Count(), new Fields(new Comparable[]{"value", "count"})));
        connect.complete();
        validateLength(connect, 1, null);
    }

    @Test
    public void testFilterAll() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("filterall"), SinkMode.REPLACE), new Every(new GroupBy(new Each(new GroupBy(new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip", "time", "method", "event", "status", "size"}), "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$", new int[]{1, 2, 3, 4, 5, 6})), new Fields(new Comparable[]{"method"}), new RegexFilter("^fobar")), new Fields(new Comparable[]{"method"})), new Fields(new Comparable[]{"method"}), new Identity(new Fields(new Comparable[]{"value"})), Fields.ALL), new Fields(new Comparable[]{"value"})), new Count(), new Fields(new Comparable[]{"value", "count"})));
        connect.complete();
        validateLength(connect, 0, null);
    }

    @Test
    public void testSplit() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("split1"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("split2"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*"));
        Pipe each3 = new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*102.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", textFile);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", textFile2);
        hashMap2.put("right", textFile3);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each2, each3});
        connect.complete();
        validateLength(connect, 1, "left");
        validateLength(connect, 2, "right");
    }

    @Test
    public void testSplitNonSafe() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("nonsafesplit1"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("nonsafesplit2"), SinkMode.REPLACE);
        Each each = new Each(new Each(new Pipe("split"), new TestFunction(new Fields(new Comparable[]{"ignore"}), new Tuple(new Object[]{1}), false), new Fields(new Comparable[]{"line"})), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*"));
        Pipe each3 = new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*102.*"));
        HashMap hashMap = new HashMap();
        hashMap.put("split", textFile);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("left", textFile2);
        hashMap2.put("right", textFile3);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{each2, each3});
        connect.complete();
        validateLength(connect, 1, "left");
        validateLength(connect, 2, "right");
    }

    @Test
    public void testSplitSameSourceMerged() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("splitsourcemerged"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new GroupBy("merged", Pipe.pipes(new Pipe[]{new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*102.*"))}), new Fields(new Comparable[]{"line"})));
        connect.complete();
        validateLength(connect, 3);
    }

    @Test
    public void testSplitOut() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"num", "line"}), InputData.inputFileApache);
        HashMap hashMap = new HashMap();
        hashMap.put("lower1", textFile);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("splitout1"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("splitout2"), SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("output1", textFile2);
        hashMap2.put("output2", textFile3);
        Pipe groupBy = new GroupBy("output1", new Pipe("lower1"), new Fields(new Comparable[]{0}));
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, hashMap2, Pipe.pipes(new Pipe[]{groupBy, new GroupBy("output2", groupBy, new Fields(new Comparable[]{0}))}));
        connect.complete();
        validateLength(connect, 10, "output1");
        validateLength(connect, 10, "output2");
        assertEquals(10, asSet(connect, textFile2).size());
        assertEquals(10, asSet(connect, textFile3).size());
    }

    @Test
    public void testSplitComplex() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("splitcomp1"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("splitcomp2"), SinkMode.REPLACE);
        Each each = new Each(new Every(new GroupBy(new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}), new Count(), new Fields(new Comparable[]{"ip", "count"})), new Fields(new Comparable[]{"ip"}), new RegexFilter("^68.*"));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"ip"}), new RegexFilter(".*46.*"));
        Pipe each3 = new Each(new Pipe("right", each), new Fields(new Comparable[]{"ip"}), new RegexFilter(".*102.*"));
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap("split", textFile), Cascades.tapsMap(Pipe.pipes(new Pipe[]{each2, each3}), Tap.taps(new Tap[]{textFile2, textFile3})), new Pipe[]{each2, each3});
        connect.complete();
        validateLength(connect, 1, "left");
        validateLength(connect, 1, "right");
    }

    @Test
    public void testSplitMultiple() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("left"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("rightleft"), SinkMode.REPLACE);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("rightright"), SinkMode.REPLACE);
        Each each = new Each(new Every(new GroupBy(new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}), new Count(), new Fields(new Comparable[]{"ip", "count"})), new Fields(new Comparable[]{"ip"}), new RegexFilter("^68.*"));
        Pipe each2 = new Each(new Pipe("left", each), new Fields(new Comparable[]{"ip"}), new RegexFilter(".*46.*"));
        GroupBy groupBy = new GroupBy(new Each(new Pipe("right", each), new Fields(new Comparable[]{"ip"}), new RegexFilter(".*102.*")), new Fields(new Comparable[]{"ip"}));
        Pipe each3 = new Each(new Pipe("rightLeft", groupBy), new Fields(new Comparable[]{"ip"}), new Identity());
        Pipe each4 = new Each(new Pipe("rightRight", groupBy), new Fields(new Comparable[]{"ip"}), new Identity());
        Flow connect = getPlatform().getFlowConnector().connect(Cascades.tapsMap("split", textFile), Cascades.tapsMap(Pipe.pipes(new Pipe[]{each2, each3, each4}), Tap.taps(new Tap[]{textFile2, textFile3, textFile4})), new Pipe[]{each2, each3, each4});
        connect.complete();
        validateLength(connect, 1, "left");
        validateLength(connect, 1, "rightLeft");
        validateLength(connect, 1, "rightRight");
    }

    @Test
    public void testConcatenation() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Flow connect = getPlatform().getFlowConnector().connect(new MultiSourceTap(new Tap[]{getPlatform().getTextFile(InputData.inputFileLower), getPlatform().getTextFile(InputData.inputFileUpper)}), getPlatform().getTextFile(getOutputPath("complexconcat"), SinkMode.REPLACE), new GroupBy(new Each(new Pipe("concat"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"})));
        connect.complete();
        validateLength(connect, 10, null);
    }

    @Test
    public void testGeneratorAggregator() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("generatoraggregator"), SinkMode.REPLACE), new Every(new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new TestAggregator(new Fields(new Comparable[]{"count1"}), new Fields(new Comparable[]{"ip"}), new Tuple[]{new Tuple(new Object[]{"first1"}), new Tuple(new Object[]{"first2"})})), new TestAggregator(new Fields(new Comparable[]{"count2"}), new Fields(new Comparable[]{"ip"}), new Tuple[]{new Tuple(new Object[]{"second"}), new Tuple(new Object[]{"second2"}), new Tuple(new Object[]{"second3"})})));
        connect.complete();
        validateLength(connect, 48, null);
    }

    @Test
    public void testReplace() throws Exception {
        Flow connect = getPlatform().getFlowConnector(disableDebug()).connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"offset", "line"}), getOutputPath("replace"), SinkMode.REPLACE), new Each(new Each(new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{0}), "^[^ ]*"), Fields.REPLACE), new Fields(new Comparable[]{"line"}), new Identity(Fields.ARGS), Fields.REPLACE), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"line"})), Fields.REPLACE), new Debug(true)));
        connect.complete();
        validateLength(connect, 10, 2, Pattern.compile("^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$"));
    }

    @Test
    public void testSwap() throws Exception {
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"count", "ipaddress"}), getOutputPath("swap"), SinkMode.REPLACE), new Each(new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), Fields.SWAP), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}), new Count(new Fields(new Comparable[]{"count"}))), new Fields(new Comparable[]{"ip"}), new Identity(new Fields(new Comparable[]{"ipaddress"})), Fields.SWAP));
        connect.complete();
        validateLength(connect, 8, 2, Pattern.compile("^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$"));
    }

    @Test
    public void testNone() throws Exception {
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"count", "ip"}), getOutputPath("none"), SinkMode.REPLACE), new Each(new Every(new GroupBy(new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), Fields.ALL), new Fields(new Comparable[]{"line"}), new NoOp(), Fields.SWAP), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}), new Count(new Fields(new Comparable[]{"count"}))), Fields.NONE, new Insert(new Fields(new Comparable[]{"ipaddress"}), new Object[]{"1.2.3.4"}), Fields.ALL));
        connect.complete();
        validateLength(connect, 8, 2, Pattern.compile("^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$"));
    }

    @Test
    public void testSplitSameSourceMergedSameName() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("splitsourcemergedsamename"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new GroupBy("merged", Pipe.pipes(new Pipe[]{new Each(each, new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(each, new Fields(new Comparable[]{"line"}), new RegexFilter(".*102.*"))}), new Fields(new Comparable[]{"line"})));
        connect.complete();
        validateLength(connect, 3);
    }

    @Test
    public void testGroupGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        GroupBy groupBy = new GroupBy(new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields("ip", String.class), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})), new Fields(new Comparable[]{"ip"}), new Fields(new Comparable[]{"count"}));
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("groupgroup"), SinkMode.REPLACE);
        Map<Object, Object> properties = getProperties();
        properties.put("cascading.serialization.types.required", "true");
        Flow connect = getPlatform().getFlowConnector(properties).connect(textFile, textFile2, groupBy);
        connect.complete();
        validateLength(connect, 8, null);
    }

    @Test
    public void testGroupByInsensitive() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", delimitedFile);
        hashMap.put("upper", delimitedFile2);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("insensitivegrouping-nondeterministic"), SinkMode.REPLACE);
        Merge merge = new Merge(new Pipe[]{new Pipe("lower"), new Pipe("upper")});
        Fields fields = new Fields(new Comparable[]{"char"});
        fields.setComparator("char", new LowerComparator());
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile, new Every(new GroupBy("groupby", merge, fields), new Fields(new Comparable[]{"char"}), new Count()));
        connect.complete();
        validateLength(connect, 5, 1, Pattern.compile("^\\w+\\s2$"));
    }
}
