package cascading.cascade;

import cascading.CascadingException;
import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowListener;
import cascading.flow.LockingFlowListener;
import cascading.flow.process.ProcessFlow;
import cascading.operation.Identity;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.text.FieldJoiner;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.stats.FlowStats;
import cascading.stats.process.ProcessStepStats;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import riffle.process.DependencyIncoming;
import riffle.process.DependencyOutgoing;
import riffle.process.Process;
import riffle.process.ProcessChildren;
import riffle.process.ProcessComplete;
import riffle.process.ProcessConfiguration;
import riffle.process.ProcessCounters;
import riffle.process.ProcessStart;
import riffle.process.ProcessStop;
import riffle.process.scheduler.ProcessChain;

/* loaded from: input_file:cascading/cascade/RiffleCascadePlatformTest.class */
public class RiffleCascadePlatformTest extends PlatformTestCase {

    /* JADX INFO: Access modifiers changed from: package-private */
    @Process
    /* loaded from: input_file:cascading/cascade/RiffleCascadePlatformTest$ChildCounterRiffle.class */
    public class ChildCounterRiffle {
        Tap sink;
        Tap source;
        Map<String, Map<String, Long>> counter;

        ChildCounterRiffle(Tap tap, Tap tap2, Map<String, Map<String, Long>> map) {
            this.source = tap;
            this.sink = tap2;
            this.counter = map;
        }

        @ProcessStart
        public void start() {
        }

        @ProcessStop
        public void stop() {
        }

        @ProcessCounters
        public Map<String, Map<String, Long>> getCounters() {
            return null;
        }

        @ProcessChildren
        public List<Object> getChildren() {
            return Arrays.asList(new CounterRiffle(this.source, this.sink, this.counter));
        }

        @ProcessComplete
        public void complete() {
        }

        @DependencyOutgoing
        public Collection getOutgoing() {
            return Collections.unmodifiableCollection(Arrays.asList(this.sink));
        }

        @DependencyIncoming
        public Collection getIncoming() {
            return Collections.unmodifiableCollection(Arrays.asList(this.source));
        }

        @ProcessConfiguration
        public Object getConfiguration() {
            return new Properties();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Process
    /* loaded from: input_file:cascading/cascade/RiffleCascadePlatformTest$CounterRiffle.class */
    public class CounterRiffle {
        Tap sink;
        Tap source;
        Map<String, Map<String, Long>> counter;

        CounterRiffle(Tap tap, Tap tap2, Map<String, Map<String, Long>> map) {
            this.source = tap;
            this.sink = tap2;
            this.counter = map;
        }

        @ProcessStart
        public void start() {
        }

        @ProcessStop
        public void stop() {
        }

        @ProcessCounters
        public Map<String, Map<String, Long>> getCounters() {
            return this.counter;
        }

        @ProcessComplete
        public void complete() {
        }

        @DependencyOutgoing
        public Collection getOutgoing() {
            return Collections.unmodifiableCollection(Arrays.asList(this.sink));
        }

        @DependencyIncoming
        public Collection getIncoming() {
            return Collections.unmodifiableCollection(Arrays.asList(this.source));
        }

        @ProcessConfiguration
        public Object getConfiguration() {
            return new Properties();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Process
    /* loaded from: input_file:cascading/cascade/RiffleCascadePlatformTest$FailingRiffle.class */
    public static class FailingRiffle {
        Tap sink;
        Tap source;
        Failing failing;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:cascading/cascade/RiffleCascadePlatformTest$FailingRiffle$Failing.class */
        public enum Failing {
            START,
            STOP,
            COMPLETE
        }

        FailingRiffle(Tap tap, Tap tap2, Failing failing) {
            this.source = tap;
            this.sink = tap2;
            this.failing = failing;
        }

        @ProcessStart
        public void start() {
            if (this.failing == Failing.START) {
                crash();
            }
        }

        @ProcessStop
        public void stop() {
            if (this.failing == Failing.STOP) {
                crash();
            }
        }

        private void crash() {
            throw new CascadingException("testing");
        }

        @ProcessComplete
        public void complete() {
            if (this.failing == Failing.COMPLETE) {
                crash();
            }
        }

        @ProcessConfiguration
        public Object getConfiguration() {
            return new Properties();
        }

        @DependencyOutgoing
        public Collection getOutgoing() {
            return Collections.unmodifiableCollection(Arrays.asList(this.sink));
        }

        @DependencyIncoming
        public Collection getIncoming() {
            return Collections.unmodifiableCollection(Arrays.asList(this.source));
        }
    }

    /* loaded from: input_file:cascading/cascade/RiffleCascadePlatformTest$ThrowableListener.class */
    class ThrowableListener implements FlowListener {
        public Throwable throwable;

        ThrowableListener() {
        }

        public void onStarting(Flow flow) {
        }

        public void onStopping(Flow flow) {
        }

        public void onCompleted(Flow flow) {
        }

        public boolean onThrowable(Flow flow, Throwable th) {
            this.throwable = th;
            return true;
        }

        public Throwable getThrowable() {
            return this.throwable;
        }
    }

    public RiffleCascadePlatformTest() {
        super(false);
    }

    private Flow firstFlow(String str) {
        return getPlatform().getFlowConnector().connect(getPlatform().getTextFile(InputData.inputFileIps), getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), getOutputPath(str + "/first"), SinkMode.REPLACE), new Each(new Pipe("first"), new Fields(new Comparable[]{"line"}), new Identity(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})));
    }

    private Flow secondFlow(Tap tap, String str) {
        return getPlatform().getFlowConnector().connect(tap, getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), getOutputPath(str + "/second"), SinkMode.REPLACE), new Each(new Pipe("second"), new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third", "fourth"}), "\\.")));
    }

    private Flow thirdFlow(Tap tap, String str) {
        return getPlatform().getFlowConnector().connect(tap, getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"mangled"}), getOutputPath(str + "/third"), SinkMode.REPLACE), new Each(new Pipe("third"), new FieldJoiner(new Fields(new Comparable[]{"mangled"}), "-")));
    }

    private Flow fourthFlow(Tap tap, String str) {
        return getPlatform().getFlowConnector().connect(tap, getPlatform().getTextFile(getOutputPath(str + "/fourth"), SinkMode.REPLACE), new Each(new Pipe("fourth"), new Identity()));
    }

    @Test
    public void testSimpleRiffle() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow firstFlow = firstFlow("perpetual");
        Flow secondFlow = secondFlow(firstFlow.getSink(), "perpetual");
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), "perpetual");
        Flow fourthFlow = fourthFlow(thirdFlow.getSink(), "perpetual");
        ProcessChain processChain = new ProcessChain(true, new Object[]{fourthFlow, secondFlow, firstFlow, thirdFlow});
        processChain.start();
        processChain.complete();
        validateLength(fourthFlow, 20);
    }

    @Test
    public void testSimpleRiffleCascade() throws IOException, InterruptedException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow firstFlow = firstFlow("perpetualcascade");
        Flow secondFlow = secondFlow(firstFlow.getSink(), "perpetualcascade");
        Flow thirdFlow = thirdFlow(secondFlow.getSink(), "perpetualcascade");
        Flow fourthFlow = fourthFlow(thirdFlow.getSink(), "perpetualcascade");
        Flow processFlow = new ProcessFlow("first", firstFlow);
        Flow processFlow2 = new ProcessFlow("second", secondFlow);
        Flow processFlow3 = new ProcessFlow("third", thirdFlow);
        Flow processFlow4 = new ProcessFlow("fourth", fourthFlow);
        LockingFlowListener lockingFlowListener = new LockingFlowListener();
        processFlow2.addListener(lockingFlowListener);
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{processFlow4, processFlow2, processFlow, processFlow3});
        connect.start();
        connect.complete();
        assertTrue("did not start", lockingFlowListener.started.tryAcquire(2L, TimeUnit.SECONDS));
        assertTrue("did not complete", lockingFlowListener.completed.tryAcquire(2L, TimeUnit.SECONDS));
        validateLength(fourthFlow, 20);
    }

    @Test
    public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException {
        ThrowableListener throwableListener = new ThrowableListener();
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow flowWithException = flowWithException("startException", FailingRiffle.Failing.START);
        flowWithException.addListener(throwableListener);
        try {
            flowWithException.start();
            fail("there should have been an exception");
        } catch (CascadingException e) {
            assertNotNull(throwableListener.getThrowable());
        }
    }

    @Test
    public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException {
        ThrowableListener throwableListener = new ThrowableListener();
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow flowWithException = flowWithException("completeException", FailingRiffle.Failing.COMPLETE);
        flowWithException.addListener(throwableListener);
        try {
            flowWithException.complete();
            fail("there should have been an exception");
        } catch (CascadingException e) {
            assertNotNull(throwableListener.getThrowable());
        }
    }

    @Test
    public void testProcessFlowWithCounters() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("inner-key", 42L);
        hashMap.put("outer-key", hashMap2);
        Flow flowWithCounters = flowWithCounters("counter", hashMap);
        flowWithCounters.complete();
        FlowStats flowStats = flowWithCounters.getFlowStats();
        assertNotNull(flowStats);
        ArrayList arrayList = new ArrayList(flowStats.getChildren());
        assertEquals(1, arrayList.size());
        ProcessStepStats processStepStats = (ProcessStepStats) arrayList.get(0);
        assertEquals(hashMap.keySet(), processStepStats.getCounterGroups());
        assertEquals(hashMap2.keySet(), processStepStats.getCountersFor("outer-key"));
        assertEquals(42L, processStepStats.getCounterValue("outer-key", "inner-key"));
    }

    @Test
    public void testProcessFlowWithChildCounters() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileIps);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("inner-key", 42L);
        hashMap.put("outer-key", hashMap2);
        Flow flowWithChildren = flowWithChildren("children", hashMap);
        flowWithChildren.complete();
        FlowStats flowStats = flowWithChildren.getFlowStats();
        assertNotNull(flowStats);
        ArrayList arrayList = new ArrayList(flowStats.getChildren());
        assertEquals(1, arrayList.size());
        ProcessStepStats processStepStats = (ProcessStepStats) arrayList.get(0);
        assertEquals(hashMap.keySet(), processStepStats.getCounterGroups());
        assertEquals(hashMap2.keySet(), processStepStats.getCountersFor("outer-key"));
        assertEquals(42L, processStepStats.getCounterValue("outer-key", "inner-key"));
    }

    @Test
    public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException {
        ThrowableListener throwableListener = new ThrowableListener();
        getPlatform().copyFromLocal(InputData.inputFileIps);
        Flow flowWithException = flowWithException("stopException", FailingRiffle.Failing.STOP);
        flowWithException.addListener(throwableListener);
        flowWithException.start();
        try {
            flowWithException.stop();
            fail("there should have been an exception");
        } catch (CascadingException e) {
            assertNotNull(throwableListener.getThrowable());
        }
    }

    private Flow flowWithException(String str, FailingRiffle.Failing failing) {
        return new ProcessFlow("flow", new FailingRiffle(getPlatform().getTextFile(InputData.inputFileIps), getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), getOutputPath(str + "/first"), SinkMode.REPLACE), failing));
    }

    private Flow flowWithCounters(String str, Map<String, Map<String, Long>> map) {
        return new ProcessFlow("counter-flow", new CounterRiffle(getPlatform().getTextFile(InputData.inputFileIps), getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), getOutputPath(str + "/first"), SinkMode.REPLACE), map));
    }

    private Flow flowWithChildren(String str, Map<String, Map<String, Long>> map) {
        return new ProcessFlow("counter-flow", new ChildCounterRiffle(getPlatform().getTextFile(InputData.inputFileIps), getPlatform().getTabDelimitedFile(new Fields(new Comparable[]{"ip"}), getOutputPath(str + "/first"), SinkMode.REPLACE), map));
    }
}
