package cascading.cascade;

import cascading.CascadingException;
import cascading.ComparePlatformsTest;
import cascading.PlatformTestCase;
import cascading.flow.CountingFlowListener;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.FlowSkipStrategy;
import cascading.flow.Flows;
import cascading.flow.LockingFlowListener;
import cascading.flow.planner.FlowStepJob;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.Identity;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.text.FieldJoiner;
import cascading.pipe.Checkpoint;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.MultiSourceTap;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

/* loaded from: input_file:cascading/cascade/CascadePlatformTest.class */
public class CascadePlatformTest extends PlatformTestCase {

    /* loaded from: input_file:cascading/cascade/CascadePlatformTest$FailFunction.class */
    public class FailFunction extends BaseOperation implements Function {
        public FailFunction(Fields fields) {
            super(1, fields);
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            throw new CascadingException("testing");
        }
    }

    public CascadePlatformTest() {
        super(true);
    }

    private Flow firstFlow(String str, boolean z) {
        Tap textFile = getPlatform().getTextFile(InputData.inputFileIps);
        Pipe each = new Each(new Pipe("first"), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}));
        if (z) {
            each = new Each(each, new Fields(new Comparable[]{"ip"}), new FailFunction(Fields.ARGS), Fields.REPLACE);
        }
        return getPlatform().getFlowConnector().connect("first", textFile, getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), getOutputPath(str), SinkMode.REPLACE), each);
    }

    private Flow secondFlow(Tap tap, String str) {
        return getPlatform().getFlowConnector().connect("second", tap, getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), getOutputPath(str), SinkMode.REPLACE), new Each(new Pipe("second"), new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), "\\.")));
    }

    private Flow thirdFlow(Tap tap, String str) {
        return getPlatform().getFlowConnector().connect("third", tap, getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), getOutputPath(str), SinkMode.REPLACE), new Each(new Pipe("third"), new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-")));
    }

    private Flow thirdCheckpointFlow(Tap tap, String str) {
        Each each = new Each(new Checkpoint("checkpoint", new Each(new Pipe("third"), new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-"))), new Identity());
        return getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(each, tap).addTailSink(each, getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), getOutputPath("unusedpath"), SinkMode.REPLACE)).addCheckpoint("checkpoint", getPlatform().getTabDelimitedFile(Fields.ALL, getOutputPath(str), SinkMode.REPLACE)));
    }

    private Flow fourthFlow(Tap tap, String str) {
        Each each = new Each(new Pipe("fourth"), new Identity());
        return getPlatform().getFlowConnector().connect("fourth", tap, getPlatform().getTextFile(getOutputPath(str), SinkMode.REPLACE), each);
    }

    private Flow previousMultiTapFlow(String str, String str2) {
        return getPlatform().getFlowConnector().connect("previous-multi-tap-" + str2, getPlatform().getTextFile(InputData.inputFileIps), getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), getOutputPath(str + "/" + str2), SinkMode.REPLACE), new Each(new Pipe(str2), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})));
    }

    private Flow multiTapFlow(Tap[] tapArr, String str) {
        return getPlatform().getFlowConnector().connect("multi-tap", new MultiSourceTap(tapArr), getPlatform().getTextFile(getOutputPath(str + "/multitap"), SinkMode.REPLACE), new Each(new Pipe("multitap"), new Identity()));
    }

    @Test
    public void testSimpleCascade() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow firstFlow = firstFlow("simple/first", false);
        Flow secondFlow = secondFlow(firstFlow.getSink(), "simple/second");
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), "simple/third");
        Flow fourthFlow = fourthFlow(thirdFlow.getSink(), "simple/fourth");
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{fourthFlow, secondFlow, thirdFlow, firstFlow});
        connect.start();
        connect.complete();
        validateLength(fourthFlow, 20);
        assertTrue(connect.getHeadFlows().contains(firstFlow));
        assertTrue(connect.getSourceTaps().containsAll(firstFlow.getSourcesCollection()));
        assertTrue(connect.getTailFlows().contains(fourthFlow));
        assertTrue(connect.getSinkTaps().containsAll(fourthFlow.getSinksCollection()));
    }

    @Test
    public void testSimpleCascadeFail() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow firstFlow = firstFlow("simple/first", true);
        Flow secondFlow = secondFlow(firstFlow.getSink(), "simple/second");
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), "simple/third");
        Flow fourthFlow = fourthFlow(thirdFlow.getSink(), "simple/fourth");
        LockingFlowListener lockingFlowListener = new LockingFlowListener();
        LockingFlowListener lockingFlowListener2 = new LockingFlowListener();
        LockingFlowListener lockingFlowListener3 = new LockingFlowListener();
        LockingFlowListener lockingFlowListener4 = new LockingFlowListener();
        firstFlow.addListener(lockingFlowListener);
        secondFlow.addListener(lockingFlowListener2);
        thirdFlow.addListener(lockingFlowListener3);
        fourthFlow.addListener(lockingFlowListener4);
        Cascade connect = new CascadeConnector(getPlatform().getProperties()).connect(new Flow[]{fourthFlow, secondFlow, thirdFlow, firstFlow});
        connect.start();
        try {
            connect.complete();
            fail("did not fail");
        } catch (Exception e) {
        }
        assertEquals("first did not fail", 1, lockingFlowListener.thrown.availablePermits());
        assertEquals("second failed", 0, lockingFlowListener2.thrown.availablePermits());
        assertEquals("third failed", 0, lockingFlowListener3.thrown.availablePermits());
        assertEquals("fourth failed", 0, lockingFlowListener4.thrown.availablePermits());
        assertEquals("second started", 0, lockingFlowListener2.started.availablePermits());
        assertEquals("third started", 0, lockingFlowListener3.started.availablePermits());
        assertEquals("fourth started", 0, lockingFlowListener4.started.availablePermits());
        assertEquals("second did not stop", 1, lockingFlowListener2.stopped.availablePermits());
        assertEquals("second did not stop", 1, lockingFlowListener3.stopped.availablePermits());
        assertEquals("second did not stop", 1, lockingFlowListener4.stopped.availablePermits());
    }

    @Test
    public void testMultiTapCascade() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow previousMultiTapFlow = previousMultiTapFlow("multitap", "first");
        Flow previousMultiTapFlow2 = previousMultiTapFlow("multitap", "second");
        Flow multiTapFlow = multiTapFlow(Tap.taps(new Tap[]{previousMultiTapFlow.getSink(), previousMultiTapFlow2.getSink()}), "multitap");
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{multiTapFlow, previousMultiTapFlow, previousMultiTapFlow2});
        connect.start();
        connect.complete();
        validateLength(multiTapFlow, 40);
    }

    @Test
    public void testSkippedCascade() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow firstFlow = firstFlow("skipped/first", false);
        Flow secondFlow = secondFlow(firstFlow.getSink(), "skipped/second");
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), "skipped/third");
        Flow fourthFlow = fourthFlow(thirdFlow.getSink(), "skipped/fourth");
        CountingFlowListener countingFlowListener = new CountingFlowListener();
        secondFlow.addListener(countingFlowListener);
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{firstFlow, secondFlow, thirdFlow, fourthFlow});
        connect.setFlowSkipStrategy(new FlowSkipStrategy() { // from class: cascading.cascade.CascadePlatformTest.1
            public boolean skipFlow(Flow flow) throws IOException {
                return true;
            }
        });
        connect.start();
        connect.complete();
        assertEquals(1, countingFlowListener.skipped);
        assertFalse("file exists", fourthFlow.getSink().resourceExists(fourthFlow.getConfig()));
    }

    @Test
    public void testSimpleCascadeStop() throws IOException, InterruptedException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow firstFlow = firstFlow("stopped/first" + ComparePlatformsTest.NONDETERMINISTIC, false);
        Flow secondFlow = secondFlow(firstFlow.getSink(), "stopped/second" + ComparePlatformsTest.NONDETERMINISTIC);
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), "stopped/third" + ComparePlatformsTest.NONDETERMINISTIC);
        Flow fourthFlow = fourthFlow(thirdFlow.getSink(), "stopped/fourth" + ComparePlatformsTest.NONDETERMINISTIC);
        LockingCascadeListener lockingCascadeListener = new LockingCascadeListener();
        LockingFlowListener lockingFlowListener = new LockingFlowListener();
        firstFlow.addListener(lockingFlowListener);
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{firstFlow, secondFlow, thirdFlow, fourthFlow});
        connect.addListener(lockingCascadeListener);
        System.out.println("calling start");
        connect.start();
        assertTrue("did not start", lockingFlowListener.started.tryAcquire(60L, TimeUnit.SECONDS));
        assertTrue("cascade did not start", lockingCascadeListener.started.tryAcquire(60L, TimeUnit.SECONDS));
        while (true) {
            System.out.println("testing if running");
            if (getPlatform().isMapReduce() || getPlatform().isDAG()) {
                Thread.sleep(1000L);
            }
            Map jobsMap = Flows.getJobsMap(firstFlow);
            if (jobsMap != null && jobsMap.values().size() != 0 && ((FlowStepJob) jobsMap.values().iterator().next()).isStarted()) {
                System.out.println("calling stop");
                connect.stop();
                assertTrue("did not stop", lockingFlowListener.stopped.tryAcquire(60L, TimeUnit.SECONDS));
                assertTrue("did not complete", lockingFlowListener.completed.tryAcquire(60L, TimeUnit.SECONDS));
                assertTrue("cascade did not stop", lockingCascadeListener.stopped.tryAcquire(60L, TimeUnit.SECONDS));
                assertTrue("cascade did not complete", lockingCascadeListener.completed.tryAcquire(60L, TimeUnit.SECONDS));
                return;
            }
        }
    }

    @Test
    public void testCascadeID() throws IOException {
        Flow firstFlow = firstFlow("idtest/first", false);
        Flow secondFlow = secondFlow(firstFlow.getSink(), "idtest/second");
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), "idtest/third");
        Flow fourthFlow = fourthFlow(thirdFlow.getSink(), "idtest/fourth");
        String id = new CascadeConnector(getProperties()).connect(new Flow[]{firstFlow, secondFlow, thirdFlow, fourthFlow}).getID();
        assertNotNull("id is null", id);
        assertEquals(firstFlow.getProperty("cascading.cascade.id"), id);
        assertEquals(secondFlow.getProperty("cascading.cascade.id"), id);
        assertEquals(thirdFlow.getProperty("cascading.cascade.id"), id);
        assertEquals(fourthFlow.getProperty("cascading.cascade.id"), id);
    }

    @Test
    public void testCheckpointTapCascade() throws IOException {
        if (getPlatform().isMapReduce()) {
            getPlatform().copyFromLocal(InputData.inputFileIps);
            Flow firstFlow = firstFlow("checkpoint/first", false);
            Flow secondFlow = secondFlow(firstFlow.getSink(), "checkpoint/second");
            Flow thirdCheckpointFlow = thirdCheckpointFlow(secondFlow.getSink(), "checkpoint/third");
            Flow fourthFlow = fourthFlow((Tap) thirdCheckpointFlow.getCheckpoints().values().iterator().next(), "checkpoint/fourth");
            Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{fourthFlow, secondFlow, thirdCheckpointFlow, firstFlow});
            connect.start();
            connect.complete();
            validateLength(fourthFlow, 20);
            assertTrue(connect.getHeadFlows().contains(firstFlow));
            assertTrue(connect.getSourceTaps().containsAll(firstFlow.getSourcesCollection()));
            assertTrue(connect.getIntermediateTaps().containsAll(thirdCheckpointFlow.getCheckpointsCollection()));
            assertTrue(connect.getCheckpointsTaps().containsAll(thirdCheckpointFlow.getCheckpointsCollection()));
            assertTrue(connect.getTailFlows().contains(fourthFlow));
            assertTrue(connect.getSinkTaps().containsAll(fourthFlow.getSinksCollection()));
        }
    }

    @Test(expected = CascadeException.class)
    public void testPlannerFailureDuplicateSinks() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileIps);
        Tap textFile2 = getPlatform().getTextFile("output");
        Pipe pipe = new Pipe("copy");
        Flow connect = getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(pipe, textFile).addTailSink(pipe, textFile2));
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile3 = getPlatform().getTextFile(InputData.inputFileApache);
        new CascadeConnector(getProperties()).connect(new Flow[]{connect, getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(pipe, textFile3).addSink(pipe, textFile2).addTailSink(new Pipe("copy2", pipe), getPlatform().getTextFile("output2")))});
    }
}
