package cascading;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowStep;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Checkpoint;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Rename;
import cascading.pipe.joiner.InnerJoin;
import cascading.pipe.joiner.Joiner;
import cascading.pipe.joiner.LeftJoin;
import cascading.pipe.joiner.MixedJoin;
import cascading.pipe.joiner.OuterJoin;
import cascading.pipe.joiner.RightJoin;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Hasher;
import cascading.tuple.Tuple;
import data.InputData;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;

/* loaded from: input_file:cascading/JoinFieldedPipesPlatformTest.class */
public class JoinFieldedPipesPlatformTest extends PlatformTestCase {

    /* loaded from: input_file:cascading/JoinFieldedPipesPlatformTest$AllComparator.class */
    public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable {
        @Override // java.util.Comparator
        public int compare(Comparable comparable, Comparable comparable2) {
            return comparable.toString().compareTo(comparable2.toString());
        }

        public int hashCode(Comparable comparable) {
            if (comparable == null) {
                return 0;
            }
            return comparable.toString().hashCode();
        }
    }

    public JoinFieldedPipesPlatformTest() {
        super(true, 4, 1);
    }

    @Test
    public void testCross() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lhs", getPlatform().getTextFile(InputData.inputFileLhs));
        hashMap.put("rhs", getPlatform().getTextFile(InputData.inputFileRhs));
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("cross"), SinkMode.REPLACE), new HashJoin(new Each("lhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " ")), new Fields(new Comparable[]{"numLHS"}), new Each("rhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " ")), new Fields(new Comparable[]{"numRHS"}), new InnerJoin()));
        connect.complete();
        validateLength(connect, 37, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
    }

    @Test
    public void testJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("join"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinSamePipeName() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("renamedpipes"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipe = new Pipe("lower");
        Pipe pipe2 = new Pipe("upper");
        Pipe pipe3 = new Pipe("same", pipe);
        Pipe pipe4 = new Pipe("same", pipe2);
        Each each = new Each(pipe3, new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(pipe4, new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new Pipe("tail", new Pipe("splice", new HashJoin(new Pipe("left", each), new Fields(new Comparable[]{"num"}), new Pipe("right", each2), new Fields(new Comparable[]{"num"}), Fields.size(4)))));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinWithUnknowns() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("unknown"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(Fields.UNKNOWN, " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{0}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{0}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinFilteredBranch() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinfilteredbranch"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new GroupBy(new Each(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new RegexFilter("^fobar")), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), Fields.size(4), new OuterJoin()));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\tnull\tnull"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\tnull\tnull"})));
    }

    @Test
    public void testJoinSelf() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lhs", textFile);
        hashMap.put("rhs", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinself"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 37);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testSameSourceJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lhs", delimitedFile);
        hashMap.put("rhs", delimitedFile);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath(), SinkMode.REPLACE), new HashJoin(new Pipe("lhs"), new Fields(new Comparable[]{"num"}), new Pipe("rhs"), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 37);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinAfterEvery() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("afterevery"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Every(new GroupBy(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(), Fields.ALL), new Fields(new Comparable[]{"num"}), new Every(new GroupBy(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(), Fields.ALL), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinInnerSingleField() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joininnersingle"), SinkMode.REPLACE), new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char"}), " "), new Fields(new Comparable[]{"num1"})), new Fields(new Comparable[]{"num1"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num2", "char"}), " "), new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num2"})));
        connect.complete();
        validateLength(connect, 3, null);
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1\t1"}));
        hashSet.add(new Tuple(new Object[]{"5\t5"}));
        hashSet.removeAll(getSinkAsList(connect));
        assertEquals(0, hashSet.size());
    }

    @Test
    public void testJoinInner() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("joininner", new InnerJoin(), hashSet);
    }

    @Test
    public void testJoinOuter() throws Exception {
        if (getPlatform().isMapReduce() && getPlatform().isUseCluster()) {
            return;
        }
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"3", "c1", null, null}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"5", "e1", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e2", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e3", null, null}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        hashSet.add(new Tuple(new Object[]{"7", "g1", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g2", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g4", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g5", null, null}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("joinouter", new OuterJoin(), hashSet);
    }

    @Test
    public void testJoinInnerOuter() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"3", "c1", null, null}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"5", "e1", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e2", null, null}));
        hashSet.add(new Tuple(new Object[]{"5", "e3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g1", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g2", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g3", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g4", null, null}));
        hashSet.add(new Tuple(new Object[]{"7", "g5", null, null}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("joininnerouter", new LeftJoin(), hashSet);
    }

    @Test
    public void testJoinOuterInner() throws Exception {
        if (getPlatform().isMapReduce() && getPlatform().isUseCluster()) {
            return;
        }
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a1", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a2", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A1"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A2"}));
        hashSet.add(new Tuple(new Object[]{"1", "a3", "1", "A3"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B1"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B2"}));
        hashSet.add(new Tuple(new Object[]{"2", "b1", "2", "B3"}));
        hashSet.add(new Tuple(new Object[]{"4", "d1", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d2", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{"4", "d3", "4", "D1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F1"}));
        hashSet.add(new Tuple(new Object[]{null, null, "6", "F2"}));
        hashSet.add(new Tuple(new Object[]{null, "h1", null, "H1"}));
        handleJoins("joinouterinner", new RightJoin(), hashSet);
    }

    private void handleJoins(String str, Joiner joiner, Set<Tuple> set) throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhsSparse);
        getPlatform().copyFromLocal(InputData.inputFileRhsSparse);
        Fields applyTypes = new Fields(new Comparable[]{"num", "char"}).applyTypes(new Type[]{Integer.class, String.class});
        Tap delimitedFile = getPlatform().getDelimitedFile(applyTypes, " ", InputData.inputFileLhsSparse);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(applyTypes, " ", InputData.inputFileRhsSparse);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", delimitedFile);
        hashMap.put("upper", delimitedFile2);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(Fields.size(4, String.class), "\t", getOutputPath(str), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lower");
        Pipe pipe2 = new Pipe("upper");
        Fields fields = new Fields(new Comparable[]{"num", "char", "num2", "char2"});
        Fields fields2 = new Fields(new Comparable[]{"num"});
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, delimitedFile3, new Each(new HashJoin(pipe, fields2, pipe2, fields2, fields, joiner), Fields.ALL, new Identity(), Fields.RESULTS));
        connect.complete();
        validateLength(connect, set.size());
        set.removeAll(getSinkAsList(connect));
        assertEquals(0, set.size());
    }

    @Test
    public void testJoinMixed() throws Exception {
        if (getPlatform().isMapReduce() && getPlatform().isUseCluster()) {
            return;
        }
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLowerOffset);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("loweroffset", textFile);
        hashMap.put("lower", textFile3);
        hashMap.put("upper", textFile2);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinmixed"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, new HashJoin(Pipe.pipes(new Pipe[]{new Each(new Pipe("loweroffset"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter)}), Fields.fields(new Fields[]{new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"})}), Fields.size(6), new MixedJoin(new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER})));
        connect.complete();
        validateLength(connect, 6);
        HashSet hashSet = new HashSet();
        hashSet.add(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta"}));
        hashSet.add(new Tuple(new Object[]{"null\tnull\t2\tB\t2\tb"}));
        hashSet.add(new Tuple(new Object[]{"null\tnull\t3\tC\t3\tc"}));
        hashSet.add(new Tuple(new Object[]{"null\tnull\t4\tD\t4\td"}));
        hashSet.add(new Tuple(new Object[]{"5\tb\t5\tE\t5\te"}));
        hashSet.add(new Tuple(new Object[]{"5\te\t5\tE\t5\te"}));
        hashSet.removeAll(getSinkAsList(connect));
        assertEquals(0, hashSet.size());
    }

    @Test
    public void testJoinDiffFields() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("difffields"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter regexSplitter2 = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"numA"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter2), new Fields(new Comparable[]{"numB"})));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinGroupBy() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joingroupby"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter regexSplitter2 = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new GroupBy(new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"numA"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter2), new Fields(new Comparable[]{"numB"})), new Fields(new Comparable[]{"numA"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinSamePipe() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samepipe"), SinkMode.REPLACE), new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"}), 1, new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinSamePipe2() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samepipe2"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " "));
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile2, new HashJoin(each, new Fields(new Comparable[]{"num"}), each, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinSamePipe3() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", delimitedFile);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samepipe3"), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lower");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile, new HashJoin(new Pipe("lhs", pipe), new Fields(new Comparable[]{"num"}), new Pipe("rhs", pipe), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinAroundJoinRightMost() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper1", textFile2);
        hashMap.put("upper2", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinaroundjoinrightmost"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new HashJoin(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()), new Fields(new Comparable[]{"num1"}), new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tB"})));
    }

    @Test
    public void testJoinAroundJoinLeftMost() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper1", textFile2);
        hashMap.put("upper2", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinaroundjoinleftmost"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new HashJoin(new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()), new Fields(new Comparable[]{"num1"}), new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\tA\t1\tA\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tB\t2\tB\t2\tb"})));
    }

    @Test
    public void testJoinAroundJoinRightMostSwapped() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper1", textFile2);
        hashMap.put("upper2", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinaroundjoinswapped"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new HashJoin(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()), new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\tA\t1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tB\t2\tb\t2\tB"})));
    }

    @Test
    public void testJoinGroupByJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileJoined);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("joined", textFile3);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joingroupbyjoin"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter regexSplitter2 = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        RegexSplitter regexSplitter3 = new RegexSplitter(new Fields(new Comparable[]{"numC", "lowerC", "upperC"}), "\t");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter2);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, new HashJoin(new GroupBy(new HashJoin(each, new Fields(new Comparable[]{"numA"}), each2, new Fields(new Comparable[]{"numB"})), new Fields(new Comparable[]{"numA"})), new Fields(new Comparable[]{"numA"}), new Each(new Pipe("joined"), new Fields(new Comparable[]{"line"}), regexSplitter3), new Fields(new Comparable[]{"numC"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tb\tB"})));
    }

    @Test
    public void testJoinSameSourceIntoJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper1", textFile2);
        hashMap.put("upper2", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsamesourceintojoin"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new HashJoin(new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()), new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 3, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tB"})));
    }

    @Test
    public void testJoinSameSourceIntoJoinSimple() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("upper1", textFile);
        hashMap.put("upper2", textFile);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsamesourceintojoinsimple"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile2, new Each(new HashJoin(new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\tA\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tB\t2\tB"})));
    }

    @Test
    public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("upper1", textFile);
        hashMap.put("upper2", textFile);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsamesourceovergroupbyintojoinsimple"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("upper1"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper2"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile2, new Each(new HashJoin(new GroupBy(each, new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new GroupBy(each2, new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 3, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\tA\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tB\t2\tB"})));
    }

    @Test
    public void testJoinsIntoGroupBy() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("lhs", textFile3);
        hashMap.put("rhs", textFile4);
        Tap textFile5 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsintogroupby"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile5, new GroupBy("merging", Pipe.pipes(new Pipe[]{new Each(new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity()), new Each(new HashJoin(new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})), new Identity())}), new Fields(new Comparable[]{"num1"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 42, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\te\t5\tE"})));
    }

    @Test
    public void testJoinSamePipeAroundGroupBy() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("samepipearoundgroupby"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " "));
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new HashJoin(new Each(new Pipe("lhs", each), new Identity()), new Fields(new Comparable[]{"num"}), new Each(new GroupBy(new Each(new Pipe("rhs", each), new Identity()), new Fields(new Comparable[]{"num"})), new Identity()), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"})));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
    }

    @Test
    public void testJoinsIntoCoGroupLhs() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("lhs", textFile3);
        hashMap.put("rhs", textFile4);
        Tap textFile5 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsintocogrouplhs"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each3 = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile5, new CoGroup("cogrouping", new Each(new HashJoin(each3, new Fields(new Comparable[]{"num"}), new Each(new HashJoin(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"})), new Identity()), new Fields(new Comparable[]{"numUpperLower"}), new Fields(new Comparable[]{"numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"})), new Identity()), new Fields(new Comparable[]{"numLhs"}), new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 37, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\tA\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\ta\t5\te\t5\tE\t5\tA"})));
    }

    @Test
    public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("lhs", textFile3);
        hashMap.put("rhs", textFile4);
        Tap textFile5 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsintocogrouplhsswappedjoin"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each3 = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile5, new CoGroup("cogrouping", new Each(new HashJoin(new Each(new HashJoin(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"})), new Identity()), new Fields(new Comparable[]{"numUpperLower"}), each3, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs"})), new Identity()), new Fields(new Comparable[]{"numLhs"}), new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 37, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\te\t5\tE\t5\te\t5\tE"})));
    }

    @Test
    public void testJoinsIntoCoGroupRhs() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("lhs", textFile3);
        hashMap.put("rhs", textFile4);
        Tap textFile5 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsintocogrouprhs"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each3 = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile5, new CoGroup("cogrouping", new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new HashJoin(each3, new Fields(new Comparable[]{"num"}), new Each(new HashJoin(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"})), new Identity()), new Fields(new Comparable[]{"numUpperLower"}), new Fields(new Comparable[]{"numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower"})), new Identity()), new Fields(new Comparable[]{"numLhs"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 37, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\tA\t1\ta\t1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\tE\t5\te\t5\te\t5\tE"})));
    }

    @Test
    public void testJoinsIntoCoGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLhs);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileRhs);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("lhs", textFile3);
        hashMap.put("rhs", textFile4);
        Tap textFile5 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinsintocogroup"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each3 = new Each(new Pipe("lhs"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each4 = new Each(new Pipe("rhs"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile5, new CoGroup("cogrouping", new Each(new HashJoin(each, new Fields(new Comparable[]{"num"}), each2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2"})), new Identity()), new Fields(new Comparable[]{"numUpperLower1"}), new Each(new HashJoin(each3, new Fields(new Comparable[]{"num"}), each4, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2"})), new Identity()), new Fields(new Comparable[]{"numLhsRhs1"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 37, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\te\t5\tE\t5\te\t5\tE"})));
    }

    @Test
    public void testJoinWithHasher() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinhasher"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each each = new Each(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new ExpressionFunction(Fields.ARGS, "Integer.parseInt( num )", String.class), Fields.REPLACE);
        Each each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Fields fields = new Fields(new Comparable[]{"num"});
        fields.setComparator("num", new AllComparator());
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, textFile3, new HashJoin(each, fields, each2, new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testJoinNone() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("joinnone"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, textFile3, new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), Fields.NONE, new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), Fields.NONE, Fields.size(4)));
        connect.complete();
        validateLength(connect, 25);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t2\tB"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testGroupBySplitJoins() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileJoined);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileJoined);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("joined", textFile3);
        Tap textFile4 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("lhs"), SinkMode.REPLACE);
        Tap textFile5 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("rhs"), SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("lhs", textFile4);
        hashMap2.put("rhs", textFile5);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"numA", "lower"}), " ");
        RegexSplitter regexSplitter2 = new RegexSplitter(new Fields(new Comparable[]{"numB", "upper"}), " ");
        RegexSplitter regexSplitter3 = new RegexSplitter(new Fields(new Comparable[]{"numC", "lowerC", "upperC"}), "\t");
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter2);
        Each each3 = new Each(new Pipe("joined"), new Fields(new Comparable[]{"line"}), regexSplitter3);
        Every every = new Every(new GroupBy(each, new Fields(new Comparable[]{"numA"})), Fields.ALL, new TestIdentityBuffer(new Fields(new Comparable[]{"numA"}), 5, false), Fields.RESULTS);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new HashJoin("lhs", new Each(every, new Identity()), new Fields(new Comparable[]{"numA"}), each2, new Fields(new Comparable[]{"numB"})), new HashJoin("rhs", new Each(every, new Identity()), new Fields(new Comparable[]{"numA"}), each3, new Fields(new Comparable[]{"numC"}))});
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong number of steps", 3, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect.openSink("lhs"), 5, null);
        validateLength(connect.openSink("rhs"), 5, null);
        List asList = asList(connect, textFile4);
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(asList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
        List asList2 = asList(connect, textFile5);
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\ta\tA"})));
        assertTrue(asList2.contains(new Tuple(new Object[]{"2\tb\t2\tb\tB"})));
    }

    @Test
    public void testJoinMergeGroupBy() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileNums10);
        getPlatform().copyFromLocal(InputData.inputFileNums20);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"id"}), InputData.inputFileNums10);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"id2"}), InputData.inputFileNums20);
        Pipe pipe = new Pipe("lhs");
        Pipe pipe2 = new Pipe("rhs");
        HashJoin hashJoin = new HashJoin(pipe, new Fields(new Comparable[]{"id"}), pipe2, new Fields(new Comparable[]{"id2"}));
        Flow connect = getPlatform().getFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("join-merge")).addSource(pipe2, textFile2).addSource(pipe, textFile).addTailSink(new Every(new GroupBy(new Merge(new Pipe[]{new Each(hashJoin, new Fields(new Comparable[]{"id2"}), new Identity(), Fields.RESULTS), pipe2}), new Fields(new Comparable[]{"id2"})), new Count(new Fields(new Comparable[]{"count"}))), getPlatform().getDelimitedFile(Fields.ALL, true, "\t", null, getOutputPath("testJoinMergeGroupBy/" + (hashJoin instanceof CoGroup ? "cogroup" : "hashjoin")), SinkMode.REPLACE)));
        connect.complete();
        validateLength(connect, 20);
        List sinkAsList = getSinkAsList(connect);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple(new Object[]{"1", "2"}));
        arrayList.add(new Tuple(new Object[]{"10", "2"}));
        arrayList.add(new Tuple(new Object[]{"11", "1"}));
        arrayList.add(new Tuple(new Object[]{"12", "1"}));
        arrayList.add(new Tuple(new Object[]{"13", "1"}));
        arrayList.add(new Tuple(new Object[]{"14", "1"}));
        arrayList.add(new Tuple(new Object[]{"15", "1"}));
        arrayList.add(new Tuple(new Object[]{"16", "1"}));
        arrayList.add(new Tuple(new Object[]{"17", "1"}));
        arrayList.add(new Tuple(new Object[]{"18", "1"}));
        arrayList.add(new Tuple(new Object[]{"19", "1"}));
        arrayList.add(new Tuple(new Object[]{"2", "2"}));
        arrayList.add(new Tuple(new Object[]{"20", "1"}));
        arrayList.add(new Tuple(new Object[]{"3", "2"}));
        arrayList.add(new Tuple(new Object[]{"4", "2"}));
        arrayList.add(new Tuple(new Object[]{"5", "2"}));
        arrayList.add(new Tuple(new Object[]{"6", "2"}));
        arrayList.add(new Tuple(new Object[]{"7", "2"}));
        arrayList.add(new Tuple(new Object[]{"8", "2"}));
        arrayList.add(new Tuple(new Object[]{"9", "2"}));
        Collections.sort(sinkAsList);
        Collections.sort(arrayList);
        assertEquals(arrayList, sinkAsList);
    }

    @Test
    public void testJoinSplit() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        FlowDef addSink = FlowDef.flowDef().addSource("lhs", getPlatform().getTextFile(InputData.inputFileLhs)).addSource("rhs", getPlatform().getTextFile(InputData.inputFileRhs)).addSink("lhsSink", getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("lhs"), SinkMode.REPLACE)).addSink("rhsSink", getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("rhs"), SinkMode.REPLACE));
        HashJoin hashJoin = new HashJoin(new Each("lhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " ")), new Fields(new Comparable[]{"numLHS"}), new Each("rhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " ")), new Fields(new Comparable[]{"numRHS"}), new InnerJoin());
        addSink.addTail(new Each(new Pipe("lhsSink", hashJoin), new Identity())).addTail(new Each(new Pipe("rhsSink", hashJoin), new Identity()));
        Flow connect = getPlatform().getFlowConnector().connect(addSink);
        connect.complete();
        validateLength(connect, 37, null);
        List asList = asList(connect, (Tap) addSink.getSinks().get("lhsSink"));
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
        List asList2 = asList(connect, (Tap) addSink.getSinks().get("rhsSink"));
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
    }

    @Test
    public void testSameSourceJoinSplitIntoJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        FlowDef addSink = FlowDef.flowDef().addSource("lhs", getPlatform().getTextFile(InputData.inputFileLhs)).addSource("rhs", getPlatform().getTextFile(InputData.inputFileLhs)).addSource("joinSecond", getPlatform().getTextFile(InputData.inputFileRhs)).addSink("lhsSink", getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("lhs"), SinkMode.REPLACE)).addSink("rhsSink", getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("rhs"), SinkMode.REPLACE));
        HashJoin hashJoin = new HashJoin(new Each("lhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " ")), new Fields(new Comparable[]{"numLHS"}), new Each("rhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " ")), new Fields(new Comparable[]{"numRHS"}), new InnerJoin());
        addSink.addTail(new Each(new Pipe("lhsSink", hashJoin), new Identity())).addTail(new Each(new Pipe("rhsSink", new HashJoin(hashJoin, new Fields(new Comparable[]{"numLHS"}), new Each("joinSecond", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numRHSSecond", "charRHSSecond"}), " ")), new Fields(new Comparable[]{"numRHSSecond"}))), new Identity()));
        Flow connect = getPlatform().getFlowConnector().connect(addSink);
        connect.complete();
        List asList = asList(connect, (Tap) addSink.getSinks().get("lhsSink"));
        assertEquals(37, asList.size());
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\tb"})));
        List asList2 = asList(connect, (Tap) addSink.getSinks().get("rhsSink"));
        assertEquals(109, asList2.size());
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\tA"})));
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\tb\t1\tB"})));
    }

    @Test
    public void testJoinSplitBeforeJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLhs);
        getPlatform().copyFromLocal(InputData.inputFileRhs);
        FlowDef addSink = FlowDef.flowDef().addSource("lhs", getPlatform().getTextFile(InputData.inputFileLhs)).addSource("rhs", getPlatform().getTextFile(InputData.inputFileRhs)).addSource("joinSecond", getPlatform().getTextFile(InputData.inputFileRhs)).addSink("lhsSink", getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("lhs"), SinkMode.REPLACE)).addSink("rhsSink", getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("rhs"), SinkMode.REPLACE));
        HashJoin hashJoin = new HashJoin(new Each("lhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numLHS", "charLHS"}), " ")), new Fields(new Comparable[]{"numLHS"}), new Checkpoint(new Each("rhs", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numRHS", "charRHS"}), " "))), new Fields(new Comparable[]{"numRHS"}), new InnerJoin());
        Each each = new Each(hashJoin, new Identity());
        addSink.addTail(new GroupBy(new Each(new Pipe("lhsSink", each), new Identity()), new Fields(new Comparable[]{"numLHS"}))).addTail(new Each(new Pipe("rhsSink", new CoGroup(new Each(new Pipe("lhsSplit", each), new Identity()), new Fields(new Comparable[]{"numLHS"}), new Each("joinSecond", new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"numRHSSecond", "charRHSSecond"}), " ")), new Fields(new Comparable[]{"numRHSSecond"}))), new Identity()));
        Flow connect = getPlatform().getFlowConnector().connect(addSink);
        if (getPlatform().isDAG()) {
            assertEquals(1, ((FlowStep) connect.getFlowSteps().get(0)).getFlowNodeGraph().getElementGraphs(hashJoin).size());
        }
        connect.complete();
        List asList = asList(connect, (Tap) addSink.getSinks().get("lhsSink"));
        assertEquals(37, asList.size());
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(asList.contains(new Tuple(new Object[]{"1\ta\t1\tB"})));
        List asList2 = asList(connect, (Tap) addSink.getSinks().get("rhsSink"));
        assertEquals(109, asList2.size());
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\tA"})));
        assertTrue(asList2.contains(new Tuple(new Object[]{"1\ta\t1\tB\t1\tB"})));
    }

    @Test
    public void testGroupBySplitGroupByJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("sink"), SinkMode.REPLACE);
        Every every = new Every(new GroupBy(new Each(new Pipe("first"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(new Fields(new Comparable[]{"firstFirst"})), Fields.ALL);
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new HashJoin(every, new Fields(new Comparable[]{"num"}), new Every(new GroupBy(new Every(new GroupBy(new Each(new Pipe("second", every), new Identity()), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"firstFirst"}), new First(new Fields(new Comparable[]{"secondFirst"})), Fields.ALL), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"secondFirst"}), new First(new Fields(new Comparable[]{"thirdFirst"})), Fields.ALL), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"3\tc\t3\tc"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"4\td\t4\td"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\te\t5\te"})));
    }

    @Test
    public void testGroupBySplitSplitGroupByJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("sink"), SinkMode.REPLACE);
        Every every = new Every(new GroupBy(new Each(new Pipe("first"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(new Fields(new Comparable[]{"firstFirst"})), Fields.ALL);
        Every every2 = new Every(new GroupBy(new Each(new Pipe("second", every), new Identity()), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"firstFirst"}), new First(new Fields(new Comparable[]{"secondFirst"})), Fields.ALL);
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new HashJoin(new HashJoin(every, new Fields(new Comparable[]{"num"}), every2, new Fields(new Comparable[]{"num"}), Fields.size(4)), new Fields(new Comparable[]{0}), every2, new Fields(new Comparable[]{"num"}), Fields.size(6)));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb\t2\tb"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"3\tc\t3\tc\t3\tc"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"4\td\t4\td\t4\td"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\te\t5\te\t5\te"})));
    }

    @Test
    public void testGroupBySplitAroundSplitGroupByJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("sink"), SinkMode.REPLACE);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("sink2"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipe = new Pipe("init");
        Every every = new Every(new GroupBy(new Each(new Pipe("first", pipe), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(new Fields(new Comparable[]{"firstFirst"})), Fields.ALL);
        Pipe pipe2 = new Pipe("sink2", every);
        Every every2 = new Every(new GroupBy(new Each(new Pipe("second", pipe), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(new Fields(new Comparable[]{"secondFirst"})), Fields.ALL);
        HashJoin hashJoin = new HashJoin(new HashJoin(every2, new Fields(new Comparable[]{"num"}), every, new Fields(new Comparable[]{"num"}), Fields.size(4)), new Fields(new Comparable[]{0}), new Every(new GroupBy(new Each(new Pipe("third", every2), new Identity()), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"secondFirst"}), new First(new Fields(new Comparable[]{"thirdFirst"})), Fields.ALL), new Fields(new Comparable[]{"num"}), Fields.size(6));
        Flow connect = getPlatform().getFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName(hashJoin.getName())).addSource("init", textFile).addTailSink(hashJoin, textFile2).addTailSink(pipe2, textFile3));
        connect.complete();
        validateLength(connect, 5, null);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\ta\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tb\t2\tb"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"3\tc\t3\tc\t3\tc"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"4\td\t4\td\t4\td"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"5\te\t5\te\t5\te"})));
    }

    @Test
    public void testForkThenJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("join"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "text"}), " "));
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, textFile2, new HashJoin(each, new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper", each), new Fields(new Comparable[]{"text"}), new ExpressionFunction(Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testForkCoGroupThenHashJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("sourceLower", textFile);
        hashMap.put("sourceUpper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("join"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "text"}), " ");
        Each each = new Each(new Pipe("sourceLower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("sourceUpper"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each3 = new Each(new Pipe("leftUpper", each), new Fields(new Comparable[]{"text"}), new ExpressionFunction(Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        Each each4 = new Each(new Pipe("rightLower", each2), new Fields(new Comparable[]{"text"}), new ExpressionFunction(Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, textFile3, new HashJoin(each, new Fields(new Comparable[]{"num"}), new CoGroup("middleCoGroup", new GroupBy(each3, new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new GroupBy(each4, new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numM1", "charM1", "numM2", "charM2"})), new Fields(new Comparable[]{"numM1"})));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tb"})));
    }

    @Test
    public void testForkCoGroupThenHashJoinCoGroupAgain() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("sourceLower", textFile);
        hashMap.put("sourceUpper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("join"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "text"}), " ");
        Each each = new Each(new Pipe("sourceLower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each2 = new Each(new Pipe("sourceUpper"), new Fields(new Comparable[]{"line"}), regexSplitter);
        Each each3 = new Each(new Pipe("leftUpper", each), new Fields(new Comparable[]{"text"}), new ExpressionFunction(Fields.ARGS, "text.toUpperCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        Each each4 = new Each(new Pipe("rightLower", each2), new Fields(new Comparable[]{"text"}), new ExpressionFunction(Fields.ARGS, "text.toLowerCase(java.util.Locale.ROOT)", String.class), Fields.REPLACE);
        CoGroup coGroup = new CoGroup("middleCoGroup", new GroupBy(each3, new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new GroupBy(each4, new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"numM1", "charM1", "numM2", "charM2"}));
        HashJoin hashJoin = new HashJoin(each, new Fields(new Comparable[]{"num"}), coGroup, new Fields(new Comparable[]{"numM1"}));
        HashJoin hashJoin2 = new HashJoin(each2, new Fields(new Comparable[]{"num"}), coGroup, new Fields(new Comparable[]{"numM2"}));
        Rename rename = new Rename(hashJoin, new Fields(new Comparable[]{"num", "text", "numM1", "charM1", "numM2", "charM2"}), new Fields(new Comparable[]{"numL1", "charL1", "numM1L", "charM1L", "numM2L", "charM2L"}));
        Rename rename2 = new Rename(hashJoin2, new Fields(new Comparable[]{"num", "text", "numM1", "charM1", "numM2", "charM2"}), new Fields(new Comparable[]{"numR1", "charR1", "numM1R", "charM1R", "numM2R", "charM2R"}));
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, textFile3, new CoGroup("cogrouping", new GroupBy(rename, new Fields(new Comparable[]{"numM1L"})), new Fields(new Comparable[]{"numM1L"}), new GroupBy(rename2, new Fields(new Comparable[]{"numM2R"})), new Fields(new Comparable[]{"numM2R"})));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA\t1\ta\t1\tA\t1\tA\t1\ta"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB\t2\tb\t2\tB\t2\tB\t2\tb"})));
    }
}
