package cascading.pipe.checkpoint;

import cascading.PlatformTestCase;
import cascading.TestFunction;
import cascading.flow.Flow;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowDef;
import cascading.flow.FlowStep;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Checkpoint;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.tap.DecoratorTap;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import data.InputData;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.junit.Test;

/* loaded from: input_file:cascading/pipe/checkpoint/CheckpointPlatformTest.class */
public class CheckpointPlatformTest extends PlatformTestCase {
    public CheckpointPlatformTest() {
        super(true);
    }

    @Test
    public void testSimpleCheckpoint() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("simplecheckpoint"), SinkMode.REPLACE), new Every(new GroupBy(new Checkpoint(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}))), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})));
        connect.complete();
        validateLength(connect, 8, null);
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong size", 2, connect.getFlowSteps().size());
        }
    }

    @Test
    public void testManyCheckpoints() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Checkpoint checkpoint = new Checkpoint(new Every(new GroupBy(new Checkpoint(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}))), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})));
        new FlowConnectorProps().setCheckpointTapDecoratorClassName(DecoratorTap.class).setProperties(checkpoint.getConfigDef());
        Flow connect = getPlatform().getFlowConnector().connect(textFile, getPlatform().getTextFile(getOutputPath("manycheckpoint"), SinkMode.REPLACE), new Checkpoint(new Each(checkpoint, new Identity())));
        connect.complete();
        validateLength(connect, 8, null);
        if (getPlatform().isMapReduce()) {
            List flowSteps = connect.getFlowSteps();
            assertEquals("wrong size", 3, flowSteps.size());
            int i = 0;
            Iterator it = flowSteps.iterator();
            while (it.hasNext()) {
                if (((FlowStep) it.next()).getSink() instanceof DecoratorTap) {
                    i++;
                }
            }
            assertEquals(1, i);
        }
    }

    @Test
    public void testSimpleCheckpointTextIntermediate() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Every every = new Every(new GroupBy(new Checkpoint("checkpoint", new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}))), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}));
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("checkpoint/sink"), SinkMode.REPLACE);
        Tap delimitedFile = getPlatform().getDelimitedFile(Fields.ALL, true, "\t", "\"", getOutputPath("checkpoint/tap"), SinkMode.REPLACE);
        Flow connect = getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(every, textFile).addTailSink(every, textFile2).addCheckpoint("checkpoint", delimitedFile));
        connect.complete();
        validateLength(connect, 8);
        if (getPlatform().isMapReduce()) {
            assertEquals("wrong size", 2, connect.getFlowSteps().size());
            validateLength(connect.openTapForRead(delimitedFile), 10);
        }
    }

    @Test
    public void testFailCheckpoint() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Every every = new Every(new GroupBy(new Checkpoint("checkpoint", new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}))), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}));
        try {
            getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(every, textFile).addTailSink(every, getPlatform().getTextFile(getOutputPath("failcheckpoint/sink"), SinkMode.REPLACE)).addCheckpoint("checkpointXXXXX", getPlatform().getDelimitedFile(Fields.ALL, true, "\t", "\"", getOutputPath("failcheckpoint/tap"), SinkMode.REPLACE)));
            fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testFailCheckpointBeforeEvery() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Every every = new Every(new Checkpoint("checkpoint", new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"}))), new Count(), new Fields(new Comparable[]{"ip", "count"}));
        try {
            getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(every, textFile).addTailSink(every, getPlatform().getTextFile(getOutputPath("failcheckpointevery/sink"), SinkMode.REPLACE)).addCheckpoint("checkpoint", getPlatform().getDelimitedFile(Fields.ALL, true, "\t", "\"", getOutputPath("failcheckpointevery/tap"), SinkMode.REPLACE)));
            fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testFailCheckpointDeclaredFields() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache);
        Every every = new Every(new GroupBy(new Checkpoint("checkpoint", new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}))), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}));
        try {
            getPlatform().getFlowConnector().connect(FlowDef.flowDef().addSource(every, textFile).addTailSink(every, getPlatform().getTextFile(getOutputPath("failcheckpointdeclared/sink"), SinkMode.REPLACE)).addCheckpoint("checkpoint", getPlatform().getTextFile(getOutputPath("failcheckpointdeclared/tap"), SinkMode.REPLACE)));
            fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testDuplicateCheckpoint() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        try {
            getPlatform().getFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("restartable")).addSource("test", getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache)).addTailSink(new Checkpoint("checkpoint", new Every(new GroupBy(new Checkpoint("checkpoint", new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}))), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}))), getPlatform().getTextFile(getOutputPath("duplicatecheckpoint"), SinkMode.REPLACE)).setRunID("restartable"));
            fail("should throw element graph exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void testRestartCheckpoint() throws Exception {
        if (getPlatform().isMapReduce()) {
            getPlatform().copyFromLocal(InputData.inputFileApache);
            String outputPath = getOutputPath("restartcheckpoint");
            Flow createRestartableFlow = createRestartableFlow(outputPath, true);
            try {
                createRestartableFlow.complete();
                fail("flow should fail");
            } catch (Exception e) {
            }
            int i = 0;
            Iterator it = createRestartableFlow.getFlowSteps().iterator();
            while (it.hasNext()) {
                Tap sink = ((FlowStep) it.next()).getSink();
                if (createRestartableFlow.getSink() != sink && sink.resourceExists(createRestartableFlow.getConfig())) {
                    i++;
                }
            }
            assertEquals("wrong number of intermediate resources exist", 1, i);
            Flow createRestartableFlow2 = createRestartableFlow(outputPath, false);
            createRestartableFlow2.complete();
            validateLength(createRestartableFlow2, 8, null);
            assertEquals("wrong size", 2, createRestartableFlow2.getFlowSteps().size());
        }
    }

    private Flow createRestartableFlow(String str, boolean z) {
        return getPlatform().getFlowConnector().connect(((FlowDef) FlowDef.flowDef().setName("restartable")).addSource("test", getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache)).addTailSink(new Each(new Every(new GroupBy(new Checkpoint(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}))), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})), new TestFunction(new Fields(new Comparable[]{"insert"}), new Tuple(new Object[]{"value"}), z ? 2 : -1)), getPlatform().getTextFile(str, SinkMode.REPLACE)).setRunID("restartable"));
    }

    @Test
    public void testMergeAndWriteToTwoSinksWithCheckpoint() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(InputData.inputFileLower);
        Tap textFile2 = getPlatform().getTextFile(InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", textFile2);
        Tap textFile3 = getPlatform().getTextFile(getOutputPath("sink1"), SinkMode.REPLACE);
        Tap textFile4 = getPlatform().getTextFile(getOutputPath("sink2"), SinkMode.REPLACE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("sink1", textFile3);
        hashMap2.put("sink2", textFile4);
        GroupBy groupBy = new GroupBy(new Pipe[]{new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"line"}))), new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"line"})))}, new Fields(new Comparable[]{"line"}));
        Flow connect = getPlatform().getFlowConnector().connect(hashMap, hashMap2, new Pipe[]{new Each(new Pipe("sink1", groupBy), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"line"}))), new Each(new Checkpoint(new Each(new Pipe("sink2", groupBy), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"line"})))), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"line"})))});
        connect.complete();
        List asList = asList(connect, textFile3);
        List asList2 = asList(connect, textFile4);
        assertEquals(10, asList.size());
        assertEquals(10, asList2.size());
        assertEquals(asList, asList2);
    }
}
