package cascading.cascade;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.operation.Identity;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.text.FieldJoiner;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.IOException;
import org.junit.Test;

/* loaded from: input_file:cascading/cascade/ParallelCascadePlatformTest.class */
public class ParallelCascadePlatformTest extends PlatformTestCase {
    public ParallelCascadePlatformTest() {
        super(true);
    }

    private Flow firstFlow(String str) {
        return getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileIps), getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), getOutputPath(str), SinkMode.REPLACE), new Each(new Pipe(str), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})));
    }

    private Flow secondFlow(String str, Tap tap) {
        return getPlatform().getFlowConnector().connect(tap, getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), getOutputPath(str), SinkMode.REPLACE), new Each(new Each(new Pipe(str), new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), "\\.")), new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-")));
    }

    private Flow thirdFlow(Tap tap, Tap tap2) {
        Pipe pipe = new Pipe("lhs");
        Pipe pipe2 = new Pipe("rhs");
        CoGroup coGroup = new CoGroup(pipe, new Fields(new Comparable[]{0}), pipe2, new Fields(new Comparable[]{0}), Fields.size(2));
        return getPlatform().getFlowConnector().connect(Cascades.tapsMap(Pipe.pipes(new Pipe[]{pipe, pipe2}), Tap.taps(new Tap[]{tap, tap2})), getPlatform().getTextFile(getOutputPath("third"), SinkMode.REPLACE), coGroup);
    }

    @Test
    public void testCascade() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow firstFlow = firstFlow("first1");
        Flow secondFlow = secondFlow("second1", firstFlow.getSink());
        Flow firstFlow2 = firstFlow("first2");
        Flow secondFlow2 = secondFlow("second2", firstFlow2.getSink());
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), secondFlow2.getSink());
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{firstFlow, secondFlow, firstFlow2, secondFlow2, thirdFlow});
        connect.start();
        connect.complete();
        validateLength(thirdFlow, 28);
    }

    @Test
    public void testCascadeRaceCondition() throws Throwable {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        final Throwable[] thArr = new Throwable[1];
        CascadeListener cascadeListener = new CascadeListener() { // from class: cascading.cascade.ParallelCascadePlatformTest.1
            public void onStarting(Cascade cascade) {
            }

            public void onStopping(Cascade cascade) {
            }

            public void onCompleted(Cascade cascade) {
            }

            public boolean onThrowable(Cascade cascade, Throwable th) {
                thArr[0] = th;
                return false;
            }
        };
        for (int i = 0; i <= 500; i += 50) {
            Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{firstFlow(String.format("race-%d/first-nondeterministic", Integer.valueOf(i)))});
            connect.addListener(cascadeListener);
            connect.start();
            Thread.sleep(i);
            connect.stop();
            connect.complete();
            if (thArr[0] != null) {
                throw thArr[0];
            }
        }
    }
}
