package cascading;

import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowDef;
import cascading.flow.FlowProps;
import cascading.operation.Identity;
import cascading.operation.Insert;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitGenerator;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.pipe.joiner.InnerJoin;
import cascading.pipe.joiner.Joiner;
import cascading.pipe.joiner.LeftJoin;
import cascading.pipe.joiner.MixedJoin;
import cascading.pipe.joiner.OuterJoin;
import cascading.pipe.joiner.RightJoin;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.util.NullNotEquivalentComparator;
import data.InputData;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Test;

/* loaded from: input_file:cascading/CoGroupFieldedPipesPlatformTest.class */
public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase {
    public CoGroupFieldedPipesPlatformTest() {
        super(true, 4, 1);
    }

    @Test
    public void testCross() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lhs", getPlatform().getTextFile(InputData.inputFileLhs));
        hashMap.put("rhs", getPlatform().getTextFile(InputData.inputFileRhs));
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cross"), SinkMode.REPLACE), new CoGroup(new Each("lhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " ")), new Fields(new Comparable[]{"numLHS"}), new Each("rhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " ")), new Fields(new Comparable[]{"numRHS"}), new InnerJoin()));
        connect.complete();
        validateLength(connect, 37, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
    }

    @Test
    public void testCoGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cogroup"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        CoGroup coGroup = new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new InnerJoin(Fields.size(4)));
        Map<Object, Object> properties = getProperties();
        FlowProps.setDefaultTupleElementComparator(properties, getPlatform().getStringComparator(false).getClass().getCanonicalName());
        Flow connect = getPlatform().getFlowConnector(properties).connect(hashMap, textFile3, coGroup);
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupSamePipeName() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("renamedpipes"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipe = new Pipe("lower");
        Pipe pipe2 = new Pipe("upper");
        Pipe pipe3 = new Pipe("same", pipe);
        Pipe pipe4 = new Pipe("same", pipe2);
        Each each = new Each(pipe3, new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(pipe4, new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new Pipe("tail", new Pipe("splice", new CoGroup(new Pipe("left", each), new Fields(new Comparable[]{"num"}), new Pipe("right", each2), new Fields(new Comparable[]{"num"}), Fields.size(4)))));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupWithUnknowns() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("unknown"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(Fields.UNKNOWN, " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{0}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{0}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupFilteredBranch() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cogroupfilteredbranch"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new GroupBy(new Each(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new RegexFilter("^fobar")), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), Fields.size(4), new OuterJoin()));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\tnull\tnull"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\tnull\tnull"})));
    }

    @Test
    public void testCoGroupSelf() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cogroupself"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testSplitCoGroupSelf() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lowerLhs", textFile);
        hashMap.put("upperLhs", textFile);
        hashMap.put("lowerRhs", textFile);
        hashMap.put("upperRhs", textFile);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("splitcogroupself/lhs"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("splitcogroupself/rhs"), SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("lhs", textFile2);
        hashMap2.put("rhs", textFile3);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new CoGroup("lhs", new Each(new Pipe("lowerLhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upperLhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), Fields.size(4)), new CoGroup("rhs", new Each(new Pipe("lowerRhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upperRhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), Fields.size(4))});
        connect.complete();
        List asList = asList(connect, textFile2);
        assertEquals(5, asList.size());
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(asList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
        List asList2 = asList(connect, textFile3);
        assertEquals(5, asList2.size());
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(asList2.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupAfterEvery() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Long.TYPE, String.class}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Long.TYPE, String.class}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("afterevery"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{String.class, String.class}), " ");
        CoGroup coGroup = new CoGroup(new Every(new GroupBy(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(), Fields.ALL), new Fields(new Comparable[]{"num"}), new Every(new GroupBy(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(), Fields.ALL), new Fields(new Comparable[]{"num"}), Fields.size(4));
        Map<Object, Object> properties = getPlatform().getProperties();
        properties.put("cascading.serialization.types.required", "true");
        Flow connect = getPlatform().getFlowConnector(properties).connect(hashMap, textFile3, coGroup);
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupAfterEveryNoDeclared() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("aftereverynodeclared"), SinkMode.REPLACE), new CoGroup(new Every(new GroupBy(new Each(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " ")), new Insert(new Fields(new Comparable[]{"one", "two", "three", "four"}), new Object[]{"one", "two", "three", "four"}), Fields.ALL), new Fields(new Comparable[]{"num1"})), new Fields(new Comparable[]{"char1"}), new First(), Fields.ALL), new Fields(new Comparable[]{"num1"}), new Every(new GroupBy(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " ")), new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"char2"}), new First(), Fields.ALL), new Fields(new Comparable[]{"num2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupInnerSingleField() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cogroupinnersingle"), SinkMode.REPLACE), new Every(new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char"}), " "), new Fields(new Comparable[]{"num1"})), new Fields(new Comparable[]{"num1"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num2", "char"}), " "), new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num2"})), new Count()));
        connect.complete();
        validateLength(connect, 2, null);
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1\t1\t1"}));
        hashSet.add(new Tuple(new Object[]{"5\t5\t2"}));
        hashSet.removeAll(getSinkAsList(connect));
        assertEquals(0, hashSet.size());
    }

    @Test
    public void testCoGroupInner() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("cogroupinner", new InnerJoin(), hashSet, 8, false, null);
        handleJoins("cogroupinner-resultgroup", new InnerJoin(), hashSet, 8, true, null);
    }

    @Test
    public void testCoGroupInnerNull() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        handleJoins("cogroupinnernull", new InnerJoin(), hashSet, 9, false, new NullNotEquivalentComparator());
        handleJoins("cogroupinnernull-resultgroup", new InnerJoin(), hashSet, 9, true, new NullNotEquivalentComparator());
    }

    @Test
    public void testCoGroupOuter() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"3", "c1", null, null}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"5", "e1", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e2", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e3", null, null}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        hashSet.add(new Tuple(new Object[]{"7", "g1", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g2", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g4", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g5", null, null}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("cogroupouter", new OuterJoin(), hashSet, 8, false, null);
        handleJoins("cogroupouter-resultgroup", new OuterJoin(), hashSet, 8, true, null);
    }

    @Test
    public void testCoGroupOuterNull() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"3", "c1", null, null}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"5", "e1", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e2", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e3", null, null}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        hashSet.add(new Tuple(new Object[]{"7", "g1", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g2", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g4", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g5", null, null}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, null}));
        hashSet.add(new Tuple(new Object[]{null, null, null, "H1"}));
        handleJoins("cogroupouternull", new OuterJoin(), hashSet, 9, false, new NullNotEquivalentComparator());
        handleJoins("cogroupouternull-resultgroup", new OuterJoin(), hashSet, 9, true, new NullNotEquivalentComparator());
    }

    @Test
    public void testCoGroupInnerOuter() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"3", "c1", null, null}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"5", "e1", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e2", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g1", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g2", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g4", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g5", null, null}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("cogroupinnerouter", new LeftJoin(), hashSet, 8, false, null);
        handleJoins("cogroupinnerouter-resultgroup", new LeftJoin(), hashSet, 8, true, null);
    }

    @Test
    public void testCoGroupInnerOuterNull() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"3", "c1", null, null}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"5", "e1", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e2", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g1", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g2", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g4", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g5", null, null}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, null}));
        handleJoins("cogroupinnerouternull", new LeftJoin(), hashSet, 9, false, new NullNotEquivalentComparator());
        handleJoins("cogroupinnerouternull-resultgroup", new LeftJoin(), hashSet, 9, true, new NullNotEquivalentComparator());
    }

    @Test
    public void testCoGroupOuterInner() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("cogroupouterinner", new RightJoin(), hashSet, 8, false, null);
        handleJoins("cogroupouterinner-resultgroup", new RightJoin(), hashSet, 8, true, null);
    }

    @Test
    public void testCoGroupOuterInnerNull() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        hashSet.add(new Tuple(new Object[]{null, null, null, "H1"}));
        handleJoins("cogroupouterinnernull", new RightJoin(), hashSet, 9, false, new NullNotEquivalentComparator());
        handleJoins("cogroupouterinnernull-resultgroup", new RightJoin(), hashSet, 9, true, new NullNotEquivalentComparator());
    }

    private void handleJoins(String str, Joiner joiner, Set<Tuple> set, int i, boolean z, NullNotEquivalentComparator nullNotEquivalentComparator) throws Exception {
        HashSet hashSet = new HashSet(set);
        getPlatform().copyFromLocal(InputData.inputFileLhsSparse);
        getPlatform().copyFromLocal(InputData.inputFileRhsSparse);
        Fields applyTypes = new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{Integer.class, String.class});
        Tap delimitedFile = getPlatform().getDelimitedFile(applyTypes, " ", InputData.inputFileLhsSparse);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(applyTypes, " ", InputData.inputFileRhsSparse);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", delimitedFile);
        hashMap.put("upper", delimitedFile2);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(Fields.size(4, String.class), "\t", getOutputPath(str), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lower");
        Pipe pipe2 = new Pipe("upper");
        Fields fields = new Fields(new Comparable[]{"num", "char", "num2", "char2"});
        Fields fields2 = new Fields(new Comparable[]{"num"});
        if (nullNotEquivalentComparator != null) {
            fields2.setComparator(0, nullNotEquivalentComparator);
        }
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, delimitedFile3, new Every(z ? new CoGroup(pipe, fields2, pipe2, fields2, fields, new Fields(new Comparable[]{"num", "num2"}), joiner) : new CoGroup(pipe, fields2, pipe2, fields2, fields, joiner), Fields.ALL, new TestIdentityBuffer(new Fields(new Comparable[]{"num", "num2"}), i, true), Fields.RESULTS));
        connect.complete();
        validateLength(connect, hashSet.size());
        hashSet.removeAll(getSinkAsList(connect));
        assertEquals(0, hashSet.size());
    }

    @Test
    public void testCoGroupMixed() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("loweroffset", textFile);
        hashMap.put("lower", textFile3);
        hashMap.put("upper", textFile2);
        Tap delimitedFile = getPlatform().getDelimitedFile(Fields.size(6, String.class), "\t", getOutputPath("cogroupmixed"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, delimitedFile, new CoGroup(Pipe.pipes(new Pipe[]{new Each(new Pipe("loweroffset"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter)}), Fields.fields(new Fields[]{new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"})}), Fields.size(6), new MixedJoin(new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER})));
        connect.complete();
        validateLength(connect, 6);
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a", "1", "A", "1", "a"}));
        hashSet.add(new Tuple(new Object[]{null, null, "2", "B", "2", "b"}));
        hashSet.add(new Tuple(new Object[]{null, null, "3", "C", "3", "c"}));
        hashSet.add(new Tuple(new Object[]{null, null, "4", "D", "4", "d"}));
        hashSet.add(new Tuple(new Object[]{"5", "b", "5", "E", "5", "e"}));
        hashSet.add(new Tuple(new Object[]{"5", "e", "5", "E", "5", "e"}));
        hashSet.removeAll(getSinkAsList(connect));
        assertEquals(0, hashSet.size());
    }

    @Test
    public void testCoGroupDiffFields() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("difffields"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter regexSplitter2 = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"numA"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter2), new Fields(new Comparable[]{"numB"})));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupGroupBy() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cogroupgroupby"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}).applyTypes(new Type[]{String.class, String.class}), " ");
        RegexSplitter regexSplitter2 = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}).applyTypes(new Type[]{String.class, String.class}), " ");
        GroupBy groupBy = new GroupBy(new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"numA"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter2), new Fields(new Comparable[]{"numB"})), new Fields(new Comparable[]{"numA"}));
        Map<Object, Object> properties = getPlatform().getProperties();
        properties.put("cascading.serialization.types.required", "true");
        Flow connect = getPlatform().getFlowConnector(properties).connect(hashMap, textFile3, groupBy);
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testCoGroupSamePipe() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samepipe"), SinkMode.REPLACE), new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"}), 1, new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupSamePipe2() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samepipe2"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " "));
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile2, new CoGroup(each, new Fields(new Comparable[]{"num"}), each, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupSamePipe3() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", delimitedFile);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samepipe3"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lower");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile, new CoGroup(new Pipe("lhs", pipe), new Fields(new Comparable[]{"num"}), new Pipe("rhs", pipe), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testCoGroupAroundCoGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper1", textFile2);
        hashMap.put("upper2", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cogroupacogroup"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new CoGroup(new Each(new CoGroup(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()), new Fields(new Comparable[]{"num1"}), new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tB"})));
    }

    @Test
    public void testCoGroupAroundCoGroupWithout() throws Exception {
        runCoGroupAroundCoGroup(null, "cogroupacogroupopt1");
    }

    @Test
    public void testCoGroupAroundCoGroupWith() throws Exception {
        runCoGroupAroundCoGroup(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num"}), "\t", InputData.inputFileNums10).getScheme().getClass(), "cogroupacogroupopt2");
    }

    private void runCoGroupAroundCoGroup(Class cls, String str) throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileNums20);
        getPlatform().copyFromLocal(InputData.inputFileNums10);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num"}), "\t", InputData.inputFileNums10);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num"}), "\t", InputData.inputFileNums20);
        HashMap hashMap = new HashMap();
        hashMap.put("source20", delimitedFile2);
        hashMap.put("source101", delimitedFile);
        hashMap.put("source102", delimitedFile);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath(str), SinkMode.REPLACE);
        Pipe pipe = new Pipe("source20");
        Pipe pipe2 = new Pipe("source101");
        Each each = new Each(new CoGroup(new CoGroup(pipe, new Fields(new Comparable[]{"num"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"})), new Fields(new Comparable[]{"num1"}), new Pipe("source102"), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"})), new Identity());
        Map<Object, Object> properties = getPlatform().getProperties();
        if (getPlatform().isMapReduce()) {
            FlowConnectorProps.setIntermediateSchemeClass(properties, cls);
        }
        Flow connect = getPlatform().getFlowConnector(properties).connect("cogroupopt", hashMap, textFile, each);
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 10);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\t1\t1"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"10\t10\t10"})));
    }

    @Test
    public void testCoGroupDiffFieldsSameFile() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("offsetLower", textFile);
        hashMap.put("lower", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samefiledifffields"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"numA", "left"}), " ");
        RegexSplitter regexSplitter2 = new RegexSplitter(new Fields(new Comparable[]{"numB", "right"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new CoGroup(new Each(new Discard(new Pipe("offsetLower"), new Fields(new Comparable[]{"offset"})), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"numA"}), new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter2), new Fields(new Comparable[]{"numB"})));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinNone() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinnone"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, textFile3, new CoGroup(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), Fields.NONE, new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), Fields.NONE, Fields.size(4)));
        connect.complete();
        validateLength(connect, 25);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t2\tB"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testMultiJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileCrossX2);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileCrossX2);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("inner"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("outer"), SinkMode.REPLACE);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("left"), SinkMode.REPLACE);
        Tap textFile5 = getPlatform().getTextFile(getOutputPath("right"), SinkMode.REPLACE);
        Pipe every = new Every(new GroupBy(new Each(new Pipe("unique"), new Fields(new Comparable[]{"line"}), new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s")), new Fields(new Comparable[]{"word"})), new Fields(new Comparable[]{"word"}), new First(Fields.ARGS), Fields.REPLACE);
        Pipe each = new Each(new Pipe("fielded"), new Fields(new Comparable[]{"line"}), new RegexSplitter("\\s"));
        Pipe coGroup = new CoGroup("inner", each, new Fields(new Comparable[]{0}), every, new Fields(new Comparable[]{"word"}), new InnerJoin());
        Pipe coGroup2 = new CoGroup("outer", each, new Fields(new Comparable[]{0}), every, new Fields(new Comparable[]{"word"}), new OuterJoin());
        Pipe coGroup3 = new CoGroup("left", each, new Fields(new Comparable[]{0}), every, new Fields(new Comparable[]{"word"}), new LeftJoin());
        Pipe coGroup4 = new CoGroup("right", each, new Fields(new Comparable[]{0}), every, new Fields(new Comparable[]{"word"}), new RightJoin());
        Map tapsMap = Cascades.tapsMap(Pipe.pipes(new Pipe[]{every, each}), Tap.taps(new Tap[]{textFile, textFile}));
        Pipe[] pipes = Pipe.pipes(new Pipe[]{coGroup, coGroup2, coGroup3, coGroup4});
        Flow connect = getPlatform().getFlowConnector().connect("multi-joins", tapsMap, Cascades.tapsMap(pipes, Tap.taps(new Tap[]{textFile2, textFile3, textFile4, textFile5})), pipes);
        connect.complete();
        validateLength(connect.openTapForRead(textFile2), 74);
        validateLength(connect.openTapForRead(textFile3), 84);
        validateLength(connect.openTapForRead(textFile4), 74);
        validateLength(connect.openTapForRead(textFile5), 84);
    }

    @Test
    public void testMultiJoinWithSplits() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileCrossX2);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileCrossX2);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("innerLhs"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("uniquesLhs"), SinkMode.REPLACE);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("innerRhs"), SinkMode.REPLACE);
        Tap textFile5 = getPlatform().getTextFile(getOutputPath("uniquesRhs"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("incoming");
        Each each = new Each(new Each(new Pipe("genLhsLhs", pipe), new Fields(new Comparable[]{"line"}), new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s")), new Identity());
        Each each2 = new Each(new Pipe("genRhsLhs", pipe), new Fields(new Comparable[]{"line"}), new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s"));
        Every every = new Every(new GroupBy(new Each(new Pipe("uniquesLhs", each), new Identity()), new Fields(new Comparable[]{"word"})), new Fields(new Comparable[]{"word"}), new First(Fields.ARGS), Fields.REPLACE);
        CoGroup coGroup = new CoGroup("innerLhs", new Rename(new Pipe("lhs", each), new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"lhs"})), new Fields(new Comparable[]{0}), new Rename(new Pipe("rhs", each2), new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"rhs"})), new Fields(new Comparable[]{0}), new InnerJoin());
        Each each3 = new Each(new Each(new Pipe("genLhsRhs", pipe), new Fields(new Comparable[]{"line"}), new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s")), new Identity());
        Each each4 = new Each(new Pipe("genRhsRhs", pipe), new Fields(new Comparable[]{"line"}), new RegexSplitGenerator(new Fields(new Comparable[]{"word"}), "\\s"));
        Flow connect = getPlatform().getFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("multi-joins")).addSource("incoming", textFile).addTailSink(coGroup, textFile2).addTailSink(every, textFile3).addTailSink(new CoGroup("innerRhs", new Rename(new Pipe("lhs", each3), new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"lhs"})), new Fields(new Comparable[]{0}), new Rename(new Pipe("rhs", each4), new Fields(new Comparable[]{"word"}), new Fields(new Comparable[]{"rhs"})), new Fields(new Comparable[]{0}), new InnerJoin()), textFile4).addTailSink(new Every(new GroupBy(new Each(new Pipe("uniquesRhs", each3), new Identity()), new Fields(new Comparable[]{"word"})), new Fields(new Comparable[]{"word"}), new First(Fields.ARGS), Fields.REPLACE), textFile5));
        connect.complete();
        validateLength(connect.openTapForRead(textFile2), 3900);
        validateLength(connect.openTapForRead(textFile3), 15);
        validateLength(connect.openTapForRead(textFile4), 3900);
        validateLength(connect.openTapForRead(textFile5), 15);
    }

    @Test
    public void testSameSourceGroupSplitCoGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Long.TYPE, String.class}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath(), SinkMode.REPLACE);
        Every every = new Every(new GroupBy(new Each(new Pipe("source"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{String.class, String.class}), " ")), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(new Fields("first", String.class)), Fields.ALL);
        CoGroup coGroup = new CoGroup(new Retain(new Pipe("lhs", every), Fields.ALL), new Fields(new Comparable[]{"num"}), new Retain(new Pipe("rhs", every), Fields.ALL), new Fields(new Comparable[]{"num"}), Fields.size(4));
        Map<Object, Object> properties = getPlatform().getProperties();
        properties.put("cascading.serialization.types.required", "true");
        Flow connect = getPlatform().getFlowConnector(properties).connect(textFile, textFile2, coGroup);
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testGroupSplitToCoGroupsTriangle() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", getPlatform().getDelimitedFile(new Fields(new Comparable[]{"numLower", "charLower"}), " ", InputData.inputFileLower));
        hashMap.put("upper", getPlatform().getDelimitedFile(new Fields(new Comparable[]{"numUpper", "charUpper"}), " ", InputData.inputFileUpper));
        Tap textFile = getPlatform().getTextFile(getOutputPath("sink"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"numLower", "charLower"}), new Identity());
        Every every = new Every(new GroupBy(new Each(new Pipe("upper"), new Fields(new Comparable[]{"numUpper", "charUpper"}), new Identity()), new Fields(new Comparable[]{"numUpper"})), new Count(), new Fields(new Comparable[]{"numUpper", "count"}));
        Each each2 = new Each(every, new Fields(new Comparable[]{"numUpper", "count"}), new Identity());
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile, new Each(new CoGroup(new Each(new CoGroup("cogroup", each, new Fields(new Comparable[]{"numLower"}), each2, new Fields(new Comparable[]{"numUpper"})), new Fields(new Comparable[]{"numLower", "charLower", "numUpper", "count"}), new Identity()), new Fields(new Comparable[]{"numLower"}), new Each(every, new Fields(new Comparable[]{"numUpper", "count"}), new Identity(new Fields(new Comparable[]{"numUpperUpper", "countUpper"}))), new Fields(new Comparable[]{"numUpperUpper"})), Fields.ALL, new Identity()));
        connect.complete();
        validateLength(connect, 5);
    }
}
