package cascading;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.operation.Identity;
import cascading.operation.aggregator.First;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import data.InputData;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.junit.Test;

/* loaded from: input_file:cascading/MergePipesPlatformTest.class */
public class MergePipesPlatformTest extends PlatformTestCase {
    public MergePipesPlatformTest() {
        super(true);
    }

    @Test
    public void testSimpleMerge() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("simplemerge"), SinkMode.REPLACE), new Merge("merge", new Pipe[]{new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter)}));
        connect.complete();
        validateLength(connect, 10);
        List sinkAsList = getSinkAsList(connect);
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\ta"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\tA"})));
    }

    @Test
    public void testMergeGroupBy() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", delimitedFile);
        hashMap.put("upper", delimitedFile2);
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath(), SinkMode.REPLACE), new GroupBy(new Merge("merge", new Pipe[]{new Pipe("lower"), new Pipe("upper")}), Fields.ALL));
        connect.complete();
        validateLength(connect, 10);
        List sinkAsList = getSinkAsList(connect);
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\ta"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\tA"})));
    }

    @Test
    public void testSimpleMergeThree() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("simplemergethree"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, new Each(new Merge("merge", new Pipe[]{new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num", "char"}), new Identity()));
        connect.complete();
        validateLength(connect, 14);
    }

    @Test
    public void testSimpleMergeThreeChain() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("simplemergethreechain"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, new Each(new Merge(new Pipe[]{new Merge("merge", new Pipe[]{new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num", "char"}), new Identity()));
        connect.complete();
        validateLength(connect, 14);
    }

    @Test
    public void testSimpleMergeThreeChainGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("simplemergethreechaingroup"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, new GroupBy(new Merge(new Pipe[]{new Merge("merge", new Pipe[]{new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 14);
    }

    @Test
    public void testSimpleMergeThreeChainCoGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("simplemergethreechaincogroup"), SinkMode.REPLACE);
        Pipe each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Pipe each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, new CoGroup(new Merge("merge", new Pipe[]{each, each2}), new Fields(new Comparable[]{"num1"}), new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " ")), new Fields(new Comparable[]{"num2"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 6);
    }

    @Test
    public void testSameSourceMergeThreeChainGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("split", textFile);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("samemergethreechaingroup"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Pipe pipe = new Pipe("split");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile2, new GroupBy(new Merge(new Pipe[]{new GroupBy(new Merge("merge", new Pipe[]{new Each(new Pipe("lower", pipe), new Fields(new Comparable[]{"line"}), regexSplitter), new Each(new Pipe("upper", pipe), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num"})), new Each(new Pipe("offset", pipe), new Fields(new Comparable[]{"line"}), regexSplitter)}), new Fields(new Comparable[]{"num"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 15);
    }

    @Test
    public void testSplitSameSourceMerged() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("splitsourcemerged"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new Each(new Merge("merged", new Pipe[]{new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*102.*"))}), new Fields(new Comparable[]{"line"}), new Identity()));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 3);
    }

    @Test
    public void testSplitSameSourceMergedComplex() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("splitsourcemergedcomplex"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("split"), new Fields(new Comparable[]{"line"}), new RegexFilter("^68.*"));
        Each each2 = new Each(new Merge("merged-first", new Pipe[]{new Each(new Pipe("left", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right", each), new Fields(new Comparable[]{"line"}), new RegexFilter(".*102.*"))}), new Fields(new Comparable[]{"line"}), new Identity());
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, new Each(new Merge("merged-second", new Pipe[]{new Each(new Pipe("left", each2), new Fields(new Comparable[]{"line"}), new RegexFilter(".*46.*")), new Each(new Pipe("right", each2), new Fields(new Comparable[]{"line"}), new RegexFilter(".*102.*"))}), new Fields(new Comparable[]{"line"}), new Identity()));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 3);
    }

    @Test
    public void testSimpleMergeFail() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        try {
            getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("simplemergefail"), SinkMode.REPLACE), new Merge("merge", new Pipe[]{new Rename(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num2"})), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter)}));
            fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testMergeIntoHashJoinStreamed() throws Exception {
        runMergeIntoHashJoin(true);
    }

    @Test
    public void testMergeIntoHashJoinAccumulated() throws Exception {
        runMergeIntoHashJoin(false);
    }

    private void runMergeIntoHashJoin(boolean z) throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("mergeintohashjoin" + (z ? "streamed" : "accumulated")), SinkMode.REPLACE);
        Pipe each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Pipe each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each each3 = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " "));
        Merge merge = new Merge("merge", new Pipe[]{each, each2});
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, z ? new HashJoin(merge, new Fields(new Comparable[]{"num1"}), each3, new Fields(new Comparable[]{"num2"})) : new HashJoin(each3, new Fields(new Comparable[]{"num2"}), merge, new Fields(new Comparable[]{"num1"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", 1, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 6);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamed() throws Exception {
        runHashJoinIntoMergeIntoHashJoin(true);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulated() throws Exception {
        runHashJoinIntoMergeIntoHashJoin(false);
    }

    private void runHashJoinIntoMergeIntoHashJoin(boolean z) throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("hashjoinintomergeintohashjoin" + (z ? "streamed" : "accumulated")), SinkMode.REPLACE);
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Pipe each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each each3 = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " "));
        Merge merge = new Merge("merge", new Pipe[]{new Retain(new HashJoin(each, new Fields(new Comparable[]{"num1"}), each3, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num1", "char1"})), each2});
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, z ? new HashJoin(merge, new Fields(new Comparable[]{"num1"}), each3, new Fields(new Comparable[]{"num2"})) : new HashJoin(each3, new Fields(new Comparable[]{"num2"}), merge, new Fields(new Comparable[]{"num1"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", z ? 1 : 2, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, 8);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedStreamedMerge() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(true, true, true, 1);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedAccumulatedMerge() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(false, false, true, 3);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedAccumulatedMerge() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(true, false, true, 2);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedStreamedMerge() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(false, true, true, 3);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedStreamed() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(true, true, false, 1);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedAccumulated() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(false, false, false, 3);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinStreamedAccumulated() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(true, false, false, 2);
    }

    @Test
    public void testHashJoinMergeIntoHashJoinAccumulatedStreamed() throws Exception {
        runMultiHashJoinIntoMergeIntoHashJoin(false, true, false, 3);
    }

    private void runMultiHashJoinIntoMergeIntoHashJoin(boolean z, boolean z2, boolean z3, int i) throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileLowerOffset);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        hashMap.put("offset", textFile3);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("multihashjoinintomergeintohashjoin" + (((z ? "firstStreamed" : "firstAccumulated") + (z2 ? "secondStreamed" : "secondAccumulated")) + (z3 ? "interMerge" : "noInterMerge"))), SinkMode.REPLACE);
        Each each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Pipe each2 = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num1", "char1"}), " "));
        Each each3 = new Each(new Pipe("offset"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num2", "char2"}), " "));
        Merge merge = new Merge("merge1", new Pipe[]{new Retain(new HashJoin(each, new Fields(new Comparable[]{"num1"}), each3, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num1", "char1"})), each2});
        Pipe retain = new Retain(z ? new HashJoin(merge, new Fields(new Comparable[]{"num1"}), each3, new Fields(new Comparable[]{"num2"})) : new HashJoin(each3, new Fields(new Comparable[]{"num2"}), merge, new Fields(new Comparable[]{"num1"})), new Fields(new Comparable[]{"num1", "char1"}));
        if (z3) {
            retain = new Merge("merge2", new Pipe[]{retain, each2});
        }
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, textFile4, z2 ? new HashJoin(retain, new Fields(new Comparable[]{"num1"}), each3, new Fields(new Comparable[]{"num2"})) : new HashJoin(each3, new Fields(new Comparable[]{"num2"}), retain, new Fields(new Comparable[]{"num1"})));
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong num jobs", i, connect.getFlowSteps().size());
        }
        connect.complete();
        validateLength(connect, z3 ? 17 : 14);
    }

    @Test
    public void testGroupByAggregationMerge() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath(), SinkMode.REPLACE), new Merge("merge", new Pipe[]{new Every(new GroupBy(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(Fields.ARGS)), new Every(new GroupBy(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"char"}), new First(Fields.ARGS))}));
        connect.complete();
        validateLength(connect, 10);
        List sinkAsList = getSinkAsList(connect);
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\ta"})));
        assertTrue("missing value", sinkAsList.contains(new Tuple(new Object[]{"1\tA"})));
    }

    @Test
    public void testSameSourceMerge() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lhs", new Pipe("lhs"));
        Pipe pipe2 = new Pipe("rhs");
        Flow connect = getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(pipe, delimitedFile).addSource(pipe2, delimitedFile).addTailSink(new Merge("merge", new Pipe[]{pipe, pipe2}), textFile));
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testSameSourceMergeHashJoin() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap textFile = getPlatform().getTextFile(getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lhs");
        Pipe pipe2 = new Pipe("rhs");
        Rename rename = new Rename(new Merge("merge", new Pipe[]{pipe, pipe2}), new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"merged.num", "merged.char"}));
        Rename rename2 = new Rename(new Pipe("join"), new Fields(new Comparable[]{"num", "char"}), new Fields(new Comparable[]{"rhs.num", "rhs.char"}));
        Flow connect = getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(pipe, delimitedFile).addSource(pipe2, delimitedFile).addSource(rename2, delimitedFile).addTailSink(new Rename(new Retain(new HashJoin(rename, new Fields(new Comparable[]{"merged.num"}), rename2, new Fields(new Comparable[]{"rhs.num"})), new Fields(new Comparable[]{"merged.num", "merged.char", "rhs.char"})), new Fields(new Comparable[]{"merged.num", "merged.char", "rhs.char"}), new Fields(new Comparable[]{"num", "merged", "char"})), textFile));
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testHashJoinMerge() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lhs");
        Pipe pipe2 = new Pipe("mid");
        Pipe pipe3 = new Pipe("rhs");
        Rename rename = new Rename(pipe2, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        Flow connect = getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(pipe, delimitedFile).addSource(rename, delimitedFile2).addSource(pipe3, delimitedFile3).addTailSink(new Merge("merge", new Pipe[]{new Retain(new HashJoin(pipe, new Fields(new Comparable[]{"num"}), rename, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num", "char"})), pipe3}), textFile));
        connect.complete();
        validateLength(connect, 9);
    }

    @Test
    public void testSameSourceHashJoinMergeOnStreamed() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lhs");
        Pipe pipe2 = new Pipe("accumulated");
        Pipe pipe3 = new Pipe("rhs");
        Flow connect = getPlatform().getFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("streamed")).addSource(pipe, delimitedFile).addSource(pipe2, delimitedFile2).addSource(pipe3, delimitedFile).addTailSink(new Merge("merge", new Pipe[]{new Retain(new HashJoin(pipe, new Fields(new Comparable[]{"num"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num", "char", "num2", "char2"})), new Fields(new Comparable[]{"num", "char"})), pipe3}), textFile));
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testSameSourceHashJoinMergeOnAccumulated() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lhs");
        Pipe pipe2 = new Pipe("accumulated");
        Pipe pipe3 = new Pipe("rhs");
        Flow connect = getPlatform().getFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("accumulated")).addSource(pipe, delimitedFile2).addSource(pipe2, delimitedFile).addSource(pipe3, delimitedFile).addTailSink(new Merge("merge", new Pipe[]{new Retain(new HashJoin(pipe, new Fields(new Comparable[]{"num"}), pipe2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num", "char", "num2", "char2"})), new Fields(new Comparable[]{"num", "char"})), pipe3}), textFile));
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testHashJoinHashJoinMerge() throws Exception {
        if (getPlatform().isDAG()) {
            return;
        }
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile4 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lhsLower");
        Pipe pipe2 = new Pipe("lhsUpper");
        Pipe pipe3 = new Pipe("rhsLower");
        Pipe pipe4 = new Pipe("rhsUpper");
        Rename rename = new Rename(pipe2, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        Pipe retain = new Retain(new HashJoin(pipe, new Fields(new Comparable[]{"num"}), rename, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num", "char"}));
        Rename rename2 = new Rename(pipe4, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        Flow connect = getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(pipe, delimitedFile).addSource(rename, delimitedFile2).addSource(pipe3, delimitedFile3).addSource(rename2, delimitedFile4).addTailSink(new Merge("merge", new Pipe[]{retain, new Retain(new HashJoin(pipe3, new Fields(new Comparable[]{"num"}), rename2, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num", "char"}))}), textFile));
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testHashJoinHashJoinHashJoinMergeMerge() throws Exception {
        if (getPlatform().isDAG()) {
            return;
        }
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile2 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap delimitedFile3 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile4 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap delimitedFile5 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLower);
        Tap delimitedFile6 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileUpper);
        Tap delimitedFile7 = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"num", "char"}), " ", InputData.inputFileLowerOffset);
        Tap textFile = getPlatform().getTextFile(getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("lhsLower");
        Pipe pipe2 = new Pipe("lhsUpper");
        Pipe pipe3 = new Pipe("midLower");
        Pipe pipe4 = new Pipe("midUpper");
        Pipe pipe5 = new Pipe("rhsLower");
        Pipe pipe6 = new Pipe("rhsUpper");
        Pipe pipe7 = new Pipe("far");
        Rename rename = new Rename(pipe2, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        Pipe retain = new Retain(new HashJoin(pipe, new Fields(new Comparable[]{"num"}), rename, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num", "char"}));
        Rename rename2 = new Rename(pipe4, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        Pipe retain2 = new Retain(new HashJoin(pipe3, new Fields(new Comparable[]{"num"}), rename2, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num", "char"}));
        Rename rename3 = new Rename(pipe6, Fields.ALL, new Fields(new Comparable[]{"num2", "char2"}));
        Flow connect = getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(pipe, delimitedFile).addSource(rename, delimitedFile2).addSource(pipe3, delimitedFile3).addSource(rename2, delimitedFile4).addSource(pipe5, delimitedFile5).addSource(rename3, delimitedFile6).addSource(pipe7, delimitedFile7).addTailSink(new Merge("next merge", new Pipe[]{new Merge("merge", new Pipe[]{retain, retain2, new Retain(new HashJoin(pipe5, new Fields(new Comparable[]{"num"}), rename3, new Fields(new Comparable[]{"num2"})), new Fields(new Comparable[]{"num", "char"}))}), pipe7}), textFile));
        connect.complete();
        validateLength(connect, 19);
    }
}
