package co.cask.cdap.runtime.app;

import co.cask.cdap.api.annotation.Output;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.Tick;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/runtime/app/PendingMetricTestApp.class */
public class PendingMetricTestApp extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/runtime/app/PendingMetricTestApp$ForwardOne.class */
    public static class ForwardOne extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(ForwardOne.class);
        private OutputEmitter<String> out;
        private File fileToWaitFor;

        public void initialize(FlowletContext flowletContext) throws Exception {
            super.initialize(flowletContext);
            this.fileToWaitFor = PendingMetricTestApp.getTempFile(flowletContext, "one");
        }

        @ProcessInput
        void processInt(int i) throws InterruptedException {
            PendingMetricTestApp.waitForFile(this.fileToWaitFor, TimeUnit.SECONDS.toMillis(5L));
            this.out.emit(Integer.toString(i));
            LOG.info("Forwarded int " + i);
        }
    }

    /* loaded from: input_file:co/cask/cdap/runtime/app/PendingMetricTestApp$ForwardTwo.class */
    public static class ForwardTwo extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(ForwardTwo.class);
        private OutputEmitter<String> out;
        private File fileToWaitForInt;
        private File fileToWaitForString;

        public void initialize(FlowletContext flowletContext) throws Exception {
            super.initialize(flowletContext);
            this.fileToWaitForInt = PendingMetricTestApp.getTempFile(flowletContext, "two-i");
            this.fileToWaitForString = PendingMetricTestApp.getTempFile(flowletContext, "two-s");
        }

        @ProcessInput
        void processInt(int i) throws InterruptedException {
            PendingMetricTestApp.waitForFile(this.fileToWaitForInt, TimeUnit.SECONDS.toMillis(5L));
            this.out.emit(Integer.toString(i));
            LOG.info("Forwarded int " + i);
        }

        @ProcessInput
        void processString(String str) throws InterruptedException {
            PendingMetricTestApp.waitForFile(this.fileToWaitForString, TimeUnit.SECONDS.toMillis(5L));
            this.out.emit(str);
            LOG.info("Forwarded string \"" + str + "\"");
        }
    }

    /* loaded from: input_file:co/cask/cdap/runtime/app/PendingMetricTestApp$Sink.class */
    public static class Sink extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(ForwardTwo.class);
        private File fileToWaitFor;

        public void initialize(FlowletContext flowletContext) throws Exception {
            super.initialize(flowletContext);
            this.fileToWaitFor = PendingMetricTestApp.getTempFile(flowletContext, "three");
        }

        @ProcessInput
        void processString(String str) throws InterruptedException {
            PendingMetricTestApp.waitForFile(this.fileToWaitFor, TimeUnit.SECONDS.toMillis(5L));
            LOG.info("Received string \"" + str + "\"");
        }
    }

    /* loaded from: input_file:co/cask/cdap/runtime/app/PendingMetricTestApp$Source.class */
    public static class Source extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(Source.class);
        boolean generated = false;

        @Output("ints")
        private OutputEmitter<Integer> intOut;

        @Output("strings")
        private OutputEmitter<String> stringOut;

        @Tick(delay = 1, unit = TimeUnit.MILLISECONDS)
        void generateOnce() throws InterruptedException {
            if (this.generated) {
                TimeUnit.MILLISECONDS.sleep(50L);
                return;
            }
            String str = (String) getContext().getRuntimeArguments().get("count");
            int parseInt = str == null ? 2 : Integer.parseInt(str);
            for (int i = 0; i < parseInt; i++) {
                this.intOut.emit(Integer.valueOf(i));
                this.stringOut.emit(Integer.toString(i));
            }
            this.generated = true;
            LOG.info("Emitted " + parseInt + " events.");
        }
    }

    /* loaded from: input_file:co/cask/cdap/runtime/app/PendingMetricTestApp$TestPendingFlow.class */
    public static class TestPendingFlow extends AbstractFlow {
        protected void configureFlow() {
            setName("TestPendingFlow");
            setDescription("A flow to test whether queue pending events metrics are emitted correctly.");
            addFlowlet("source", new Source());
            addFlowlet("forward-one", new ForwardOne());
            addFlowlet("forward-two", new ForwardTwo());
            addFlowlet("sink", new Sink());
            connect("source", "forward-one");
            connect("source", "forward-two");
            connect("forward-one", "sink");
            connect("forward-two", "sink");
        }
    }

    public void configure() {
        addFlow(new TestPendingFlow());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void waitForFile(File file, long j) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (currentTimeMillis > System.currentTimeMillis()) {
            if (file.exists()) {
                return;
            } else {
                TimeUnit.MILLISECONDS.sleep(50L);
            }
        }
        throw new RuntimeException("timeout waiting for file");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static File getTempFile(FlowletContext flowletContext, String str) {
        String str2 = (String) flowletContext.getRuntimeArguments().get("temp");
        Assert.assertNotNull(str2);
        return new File(str2, str);
    }
}
