package co.cask.cdap.internal.app.runtime.flow;

import co.cask.cdap.api.annotation.Batch;
import co.cask.cdap.api.annotation.DisableTransaction;
import co.cask.cdap.api.annotation.HashPartition;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.RoundRobin;
import co.cask.cdap.api.annotation.Tick;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletConnection;
import co.cask.cdap.api.flow.FlowletDefinition;
import co.cask.cdap.api.flow.flowlet.Callback;
import co.cask.cdap.api.flow.flowlet.FailurePolicy;
import co.cask.cdap.api.flow.flowlet.FailureReason;
import co.cask.cdap.api.flow.flowlet.Flowlet;
import co.cask.cdap.api.flow.flowlet.FlowletSpecification;
import co.cask.cdap.api.flow.flowlet.InputContext;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.app.ApplicationSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.queue.QueueSpecification;
import co.cask.cdap.app.queue.QueueSpecificationGenerator;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.common.async.ExecutorUtils;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.BinaryDecoder;
import co.cask.cdap.common.lang.InstantiatorFactory;
import co.cask.cdap.common.lang.PropertyFieldSetter;
import co.cask.cdap.common.logging.common.LogWriter;
import co.cask.cdap.common.logging.logback.CAppender;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.stream.StreamCoordinator;
import co.cask.cdap.data.stream.StreamPropertyListener;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.DequeueStrategy;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import co.cask.cdap.internal.app.queue.QueueReaderFactory;
import co.cask.cdap.internal.app.queue.RoundRobinQueueReader;
import co.cask.cdap.internal.app.queue.SimpleQueueSpecificationGenerator;
import co.cask.cdap.internal.app.runtime.DataFabricFacade;
import co.cask.cdap.internal.app.runtime.DataFabricFacadeFactory;
import co.cask.cdap.internal.app.runtime.DataSetFieldSetter;
import co.cask.cdap.internal.app.runtime.MetricsFieldSetter;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.io.DatumWriterFactory;
import co.cask.cdap.internal.io.ReflectionDatumReader;
import co.cask.cdap.internal.io.Schema;
import co.cask.cdap.internal.io.SchemaGenerator;
import co.cask.cdap.internal.io.UnsupportedTypeException;
import co.cask.cdap.internal.lang.Reflections;
import co.cask.cdap.internal.lang.Visitor;
import co.cask.cdap.internal.specification.FlowletMethod;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.common.io.ByteBufferInputStream;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.RunIds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowletProgramRunner.class */
public final class FlowletProgramRunner implements ProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(FlowletProgramRunner.class);
    private final SchemaGenerator schemaGenerator;
    private final DatumWriterFactory datumWriterFactory;
    private final DataFabricFacadeFactory dataFabricFacadeFactory;
    private final StreamCoordinator streamCoordinator;
    private final QueueReaderFactory queueReaderFactory;
    private final MetricsCollectionService metricsCollectionService;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final DatasetFramework dsFramework;
    private final CConfiguration configuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowletProgramRunner$FlowletServiceHook.class */
    public static final class FlowletServiceHook extends AbstractService {
        private final StreamCoordinator streamCoordinator;
        private final List<String> streams;
        private final AtomicReference<FlowletProgramController> controller;
        private final Executor executor;
        private final Lock suspendLock;
        private final StreamPropertyListener propertyListener;
        private Cancellable cancellable;

        private FlowletServiceHook(final String str, StreamCoordinator streamCoordinator, List<String> list, AtomicReference<FlowletProgramController> atomicReference) {
            this.suspendLock = new ReentrantLock();
            this.streamCoordinator = streamCoordinator;
            this.streams = list;
            this.controller = atomicReference;
            this.executor = ExecutorUtils.newThreadExecutor(Threads.createDaemonThreadFactory("flowlet-stream-update-%d"));
            this.propertyListener = new StreamPropertyListener() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.FlowletServiceHook.1
                public void ttlChanged(String str2, long j) {
                    FlowletProgramRunner.LOG.debug("TTL for stream '{}' changed to {} for flowlet '{}'", new Object[]{str2, Long.valueOf(j), str});
                    FlowletServiceHook.this.suspendAndResume();
                }

                public void ttlDeleted(String str2) {
                    FlowletProgramRunner.LOG.debug("TTL for stream '{}' deleted for flowlet '{}'", str2, str);
                    FlowletServiceHook.this.suspendAndResume();
                }

                public void generationChanged(String str2, int i) {
                    FlowletProgramRunner.LOG.debug("Generation for stream '{}' changed to {} for flowlet '{}'", new Object[]{str2, Integer.valueOf(i), str});
                    FlowletServiceHook.this.suspendAndResume();
                }

                public void generationDeleted(String str2) {
                    FlowletProgramRunner.LOG.debug("Generation for stream '{}' deleted for flowlet '{}'", str2, str);
                    FlowletServiceHook.this.suspendAndResume();
                }
            };
        }

        protected void doStart() {
            final ArrayList newArrayList = Lists.newArrayList();
            this.cancellable = new Cancellable() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.FlowletServiceHook.2
                public void cancel() {
                    Iterator it = newArrayList.iterator();
                    while (it.hasNext()) {
                        ((Cancellable) it.next()).cancel();
                    }
                }
            };
            Iterator<String> it = this.streams.iterator();
            while (it.hasNext()) {
                newArrayList.add(this.streamCoordinator.addListener(it.next(), this.propertyListener));
            }
            notifyStarted();
        }

        protected void doStop() {
            if (this.cancellable != null) {
                this.cancellable.cancel();
            }
            notifyStopped();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void suspendAndResume() {
            this.executor.execute(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.FlowletServiceHook.3
                @Override // java.lang.Runnable
                public void run() {
                    FlowletServiceHook.this.suspendLock.lock();
                    try {
                        try {
                            ((FlowletProgramController) FlowletServiceHook.this.controller.get()).suspend().get();
                            ((FlowletProgramController) FlowletServiceHook.this.controller.get()).resume().get();
                            FlowletServiceHook.this.suspendLock.unlock();
                        } catch (Exception e) {
                            FlowletProgramRunner.LOG.error("Failed to suspend and resume flowlet.", e);
                            FlowletServiceHook.this.suspendLock.unlock();
                        }
                    } catch (Throwable th) {
                        FlowletServiceHook.this.suspendLock.unlock();
                        throw th;
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowletProgramRunner$ProcessMethodFactory.class */
    public interface ProcessMethodFactory {
        <T> ProcessMethod<T> create(Method method, int i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowletProgramRunner$ProcessSpecificationFactory.class */
    public interface ProcessSpecificationFactory {
        <T> ProcessSpecification create(Set<String> set, Schema schema, TypeToken<T> typeToken, ProcessMethod<T> processMethod, ConsumerConfig consumerConfig, int i, Tick tick);
    }

    @Inject
    public FlowletProgramRunner(SchemaGenerator schemaGenerator, DatumWriterFactory datumWriterFactory, DataFabricFacadeFactory dataFabricFacadeFactory, StreamCoordinator streamCoordinator, QueueReaderFactory queueReaderFactory, MetricsCollectionService metricsCollectionService, DiscoveryServiceClient discoveryServiceClient, DatasetFramework datasetFramework, CConfiguration cConfiguration) {
        this.schemaGenerator = schemaGenerator;
        this.datumWriterFactory = datumWriterFactory;
        this.dataFabricFacadeFactory = dataFabricFacadeFactory;
        this.streamCoordinator = streamCoordinator;
        this.queueReaderFactory = queueReaderFactory;
        this.metricsCollectionService = metricsCollectionService;
        this.discoveryServiceClient = discoveryServiceClient;
        this.configuration = cConfiguration;
        this.dsFramework = datasetFramework;
    }

    @Inject(optional = true)
    void setLogWriter(LogWriter logWriter) {
        CAppender.logWriter = logWriter;
    }

    @Override // co.cask.cdap.app.runtime.ProgramRunner
    public ProgramController run(Program program, ProgramOptions programOptions) {
        BasicFlowletContext basicFlowletContext = null;
        try {
            String name = programOptions.getName();
            int parseInt = Integer.parseInt(programOptions.getArguments().getOption(ProgramOptionConstants.INSTANCE_ID, "-1"));
            Preconditions.checkArgument(parseInt >= 0, "Missing instance Id");
            int parseInt2 = Integer.parseInt(programOptions.getArguments().getOption(ProgramOptionConstants.INSTANCES, "0"));
            Preconditions.checkArgument(parseInt2 > 0, "Invalid or missing instance count");
            String option = programOptions.getArguments().getOption(ProgramOptionConstants.RUN_ID);
            Preconditions.checkNotNull(option, "Missing runId");
            RunId fromString = RunIds.fromString(option);
            ApplicationSpecification specification = program.getSpecification();
            Preconditions.checkNotNull(specification, "Missing application specification.");
            ProgramType type = program.getType();
            Preconditions.checkNotNull(type, "Missing processor type.");
            Preconditions.checkArgument(type == ProgramType.FLOW, "Only FLOW process type is supported.");
            String name2 = program.getName();
            Preconditions.checkNotNull(name2, "Missing processor name.");
            FlowSpecification flowSpecification = specification.getFlows().get(name2);
            FlowletDefinition flowletDefinition = (FlowletDefinition) flowSpecification.getFlowlets().get(name);
            Preconditions.checkNotNull(flowletDefinition, "Definition missing for flowlet \"%s\"", new Object[]{name});
            boolean isAnnotationPresent = program.getMainClass().isAnnotationPresent(DisableTransaction.class);
            if (isAnnotationPresent) {
                LOG.info("Transaction is disable for flowlet {}.{}.{}", new Object[]{program.getApplicationId(), program.getId().getId(), name});
            }
            Class<?> cls = Class.forName(flowletDefinition.getFlowletSpec().getClassName(), true, program.getClassLoader());
            Preconditions.checkArgument(Flowlet.class.isAssignableFrom(cls), "%s is not a Flowlet.", new Object[]{cls});
            basicFlowletContext = new BasicFlowletContext(program, name, parseInt, fromString, parseInt2, flowletDefinition.getDatasets(), programOptions.getUserArguments(), flowletDefinition.getFlowletSpec(), this.metricsCollectionService, this.discoveryServiceClient, this.dsFramework, this.configuration);
            DataFabricFacade createNoTransaction = isAnnotationPresent ? this.dataFabricFacadeFactory.createNoTransaction(program, basicFlowletContext.getDatasetInstantiator()) : this.dataFabricFacadeFactory.create(program, basicFlowletContext.getDatasetInstantiator());
            Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> create = new SimpleQueueSpecificationGenerator(Id.Application.from(program.getAccountId(), program.getApplicationId())).create(flowSpecification);
            Flowlet flowlet = (Flowlet) new InstantiatorFactory(false).get(TypeToken.of(cls)).create();
            TypeToken<? extends Flowlet> of = TypeToken.of(cls);
            Thread.currentThread().setContextClassLoader(FlowletProgramRunner.class.getClassLoader());
            Reflections.visit(flowlet, TypeToken.of(flowlet.getClass()), new PropertyFieldSetter(flowletDefinition.getFlowletSpec().getProperties()), new Visitor[]{new DataSetFieldSetter(basicFlowletContext), new MetricsFieldSetter(basicFlowletContext.getMetrics()), new OutputEmitterFieldSetter(outputEmitterFactory(basicFlowletContext, name, createNoTransaction, create))});
            ImmutableList.Builder<ConsumerSupplier<?>> builder = ImmutableList.builder();
            Collection createProcessSpecification = createProcessSpecification(basicFlowletContext, of, processMethodFactory(flowlet), processSpecificationFactory(basicFlowletContext, createNoTransaction, this.queueReaderFactory, name, create, builder, createSchemaCache(program)), Lists.newLinkedList());
            ImmutableList build = builder.build();
            AtomicReference<FlowletProgramController> atomicReference = new AtomicReference<>();
            FlowletRuntimeService flowletRuntimeService = new FlowletRuntimeService(flowlet, basicFlowletContext, createProcessSpecification, createCallback(flowlet, flowletDefinition.getFlowletSpec()), createNoTransaction, createServiceHook(name, build, atomicReference));
            if (isAnnotationPresent) {
                LOG.info("Transaction disabled for flowlet {}", basicFlowletContext);
            }
            FlowletProgramController flowletProgramController = new FlowletProgramController(program.getName(), name, basicFlowletContext, flowletRuntimeService, build);
            atomicReference.set(flowletProgramController);
            LOG.info("Starting flowlet: {}", basicFlowletContext);
            flowletRuntimeService.start();
            LOG.info("Flowlet started: {}", basicFlowletContext);
            return flowletProgramController;
        } catch (Exception e) {
            if (basicFlowletContext != null) {
                basicFlowletContext.close();
            }
            throw Throwables.propagate(e);
        }
    }

    private <T extends Collection<ProcessSpecification<?>>> T createProcessSpecification(BasicFlowletContext basicFlowletContext, TypeToken<? extends Flowlet> typeToken, ProcessMethodFactory processMethodFactory, ProcessSpecificationFactory processSpecificationFactory, T t) throws NoSuchMethodException {
        Set newHashSet;
        TypeToken resolveType;
        ConsumerConfig consumerConfig;
        Schema generate;
        HashSet newHashSet2 = Sets.newHashSet();
        Iterator it = typeToken.getTypes().classes().iterator();
        while (it.hasNext()) {
            TypeToken typeToken2 = (TypeToken) it.next();
            if (typeToken2.getRawType().equals(Object.class)) {
                break;
            }
            for (Method method : typeToken2.getRawType().getDeclaredMethods()) {
                if (newHashSet2.add(new FlowletMethod(method, typeToken))) {
                    ProcessInput annotation = method.getAnnotation(ProcessInput.class);
                    Tick tick = (Tick) method.getAnnotation(Tick.class);
                    if (annotation != null || tick != null) {
                        ProcessMethod create = processMethodFactory.create(method, tick == null ? annotation.maxRetries() : tick.maxRetries());
                        int i = 1;
                        if (tick != null) {
                            newHashSet = ImmutableSet.of();
                            consumerConfig = new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null);
                            generate = Schema.of(Schema.Type.NULL);
                            resolveType = TypeToken.of(Void.TYPE);
                        } else {
                            newHashSet = Sets.newHashSet(annotation.value());
                            if (newHashSet.isEmpty()) {
                                newHashSet.add("");
                            }
                            resolveType = typeToken.resolveType(method.getGenericParameterTypes()[0]);
                            consumerConfig = getConsumerConfig(basicFlowletContext, method);
                            Integer batchSize = getBatchSize(method);
                            if (batchSize != null) {
                                if (resolveType.getRawType().equals(Iterator.class)) {
                                    Preconditions.checkArgument(resolveType.getType() instanceof ParameterizedType, "Only ParameterizedType is supported for batch Iterator.");
                                    resolveType = typeToken.resolveType(((ParameterizedType) resolveType.getType()).getActualTypeArguments()[0]);
                                }
                                i = batchSize.intValue();
                            }
                            try {
                                generate = this.schemaGenerator.generate(resolveType.getType());
                            } catch (UnsupportedTypeException e) {
                                throw Throwables.propagate(e);
                            }
                        }
                        ProcessSpecification create2 = processSpecificationFactory.create(newHashSet, generate, resolveType, create, consumerConfig, i, tick);
                        if (create2 != null) {
                            t.add(create2);
                        }
                    }
                }
            }
        }
        Preconditions.checkArgument(!t.isEmpty(), "No inputs found for flowlet '%s' of flow '%s' of application '%s' (%s)", new Object[]{basicFlowletContext.getFlowletId(), basicFlowletContext.getFlowId(), basicFlowletContext.getApplicationId(), typeToken});
        return t;
    }

    private ConsumerConfig getConsumerConfig(BasicFlowletContext basicFlowletContext, Method method) {
        HashPartition annotation = method.getAnnotation(HashPartition.class);
        RoundRobin annotation2 = method.getAnnotation(RoundRobin.class);
        DequeueStrategy dequeueStrategy = DequeueStrategy.FIFO;
        String str = null;
        Preconditions.checkArgument(annotation == null || annotation2 == null, "Only one strategy allowed for process() method: %s", new Object[]{method.getName()});
        if (annotation != null) {
            dequeueStrategy = DequeueStrategy.HASH;
            str = annotation.value();
            Preconditions.checkArgument(!str.isEmpty(), "Partition key cannot be empty: %s", new Object[]{method.getName()});
        } else if (annotation2 != null) {
            dequeueStrategy = DequeueStrategy.ROUND_ROBIN;
        }
        return new ConsumerConfig(basicFlowletContext.getGroupId(), basicFlowletContext.getInstanceId(), basicFlowletContext.getInstanceCount(), dequeueStrategy, str);
    }

    private Integer getBatchSize(Method method) {
        Batch annotation = method.getAnnotation(Batch.class);
        if (annotation == null) {
            return null;
        }
        int value = annotation.value();
        Preconditions.checkArgument(value > 0, "Batch size should be > 0: %s", new Object[]{method.getName()});
        return Integer.valueOf(value);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getNumGroups(Iterable<QueueSpecification> iterable, QueueName queueName) {
        int i = 0;
        Iterator<QueueSpecification> it = iterable.iterator();
        while (it.hasNext()) {
            if (queueName.equals(it.next().getQueueName())) {
                i++;
            }
        }
        return i;
    }

    private Callback createCallback(Flowlet flowlet, FlowletSpecification flowletSpecification) {
        if (flowlet instanceof Callback) {
            return (Callback) flowlet;
        }
        final FailurePolicy failurePolicy = flowletSpecification.getFailurePolicy();
        return new Callback() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.1
            public void onSuccess(Object obj, InputContext inputContext) {
            }

            public FailurePolicy onFailure(Object obj, InputContext inputContext, FailureReason failureReason) {
                return failurePolicy;
            }
        };
    }

    private OutputEmitterFactory outputEmitterFactory(final BasicFlowletContext basicFlowletContext, final String str, final QueueClientFactory queueClientFactory, final Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> table) {
        return new OutputEmitterFactory() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.2
            @Override // co.cask.cdap.internal.app.runtime.flow.OutputEmitterFactory
            public <T> OutputEmitter<T> create(String str2, TypeToken<T> typeToken) {
                try {
                    Schema generate = FlowletProgramRunner.this.schemaGenerator.generate(typeToken.getType());
                    for (QueueSpecification queueSpecification : Iterables.concat(table.row(QueueSpecificationGenerator.Node.flowlet(str)).values())) {
                        if (queueSpecification.getQueueName().getSimpleName().equals(str2) && queueSpecification.getOutputSchema().equals(generate)) {
                            final String simpleName = queueSpecification.getQueueName().getSimpleName();
                            return new DatumOutputEmitter(queueClientFactory.createProducer(queueSpecification.getQueueName(), new QueueMetrics() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.2.1
                                public void emitEnqueue(int i) {
                                    basicFlowletContext.getProgramMetrics().increment("process.events.out", i, new String[]{simpleName});
                                }

                                public void emitEnqueueBytes(int i) {
                                }
                            }), generate, FlowletProgramRunner.this.datumWriterFactory.create(typeToken, generate));
                        }
                    }
                    throw new IllegalArgumentException(String.format("No queue specification found for %s, %s", str, typeToken));
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        };
    }

    private ProcessMethodFactory processMethodFactory(final Flowlet flowlet) {
        return new ProcessMethodFactory() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.3
            @Override // co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.ProcessMethodFactory
            public <T> ProcessMethod<T> create(Method method, int i) {
                return ReflectionProcessMethod.create(flowlet, method, i);
            }
        };
    }

    private ProcessSpecificationFactory processSpecificationFactory(final BasicFlowletContext basicFlowletContext, final DataFabricFacade dataFabricFacade, final QueueReaderFactory queueReaderFactory, final String str, final Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> table, final ImmutableList.Builder<ConsumerSupplier<?>> builder, final SchemaCache schemaCache) {
        return new ProcessSpecificationFactory() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.4
            @Override // co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.ProcessSpecificationFactory
            public <T> ProcessSpecification create(Set<String> set, Schema schema, TypeToken<T> typeToken, ProcessMethod<T> processMethod, ConsumerConfig consumerConfig, int i, Tick tick) {
                LinkedList newLinkedList = Lists.newLinkedList();
                for (Map.Entry entry : table.column(str).entrySet()) {
                    for (QueueSpecification queueSpecification : (Set) entry.getValue()) {
                        QueueName queueName = queueSpecification.getQueueName();
                        if (queueSpecification.getInputSchema().equals(schema) && (set.contains(queueName.getSimpleName()) || set.contains(""))) {
                            if (((QueueSpecificationGenerator.Node) entry.getKey()).getType() == FlowletConnection.Type.STREAM) {
                                ConsumerSupplier create = ConsumerSupplier.create(dataFabricFacade, queueName, consumerConfig);
                                builder.add(create);
                                newLinkedList.add(queueReaderFactory.createStreamReader(create, i, FlowletProgramRunner.this.wrapInputDecoder(basicFlowletContext, queueName, new Function<StreamEvent, T>() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.4.1
                                    /* JADX WARN: Multi-variable type inference failed */
                                    public T apply(StreamEvent streamEvent) {
                                        return streamEvent;
                                    }
                                })));
                            } else {
                                int numGroups = FlowletProgramRunner.this.getNumGroups(Iterables.concat(table.row(entry.getKey()).values()), queueName);
                                Function<ByteBuffer, T> wrapInputDecoder = FlowletProgramRunner.this.wrapInputDecoder(basicFlowletContext, queueName, FlowletProgramRunner.this.createInputDatumDecoder(typeToken, schema, schemaCache));
                                ConsumerSupplier create2 = ConsumerSupplier.create(dataFabricFacade, queueName, consumerConfig, numGroups);
                                builder.add(create2);
                                newLinkedList.add(queueReaderFactory.createQueueReader(create2, i, wrapInputDecoder));
                            }
                        }
                    }
                }
                if (set.isEmpty() || !newLinkedList.isEmpty()) {
                    return new ProcessSpecification(new RoundRobinQueueReader(newLinkedList), processMethod, tick);
                }
                return null;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> Function<ByteBuffer, T> createInputDatumDecoder(final TypeToken<T> typeToken, final Schema schema, final SchemaCache schemaCache) {
        final ReflectionDatumReader reflectionDatumReader = new ReflectionDatumReader(schema, typeToken);
        final ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream((ByteBuffer) null);
        final BinaryDecoder binaryDecoder = new BinaryDecoder(byteBufferInputStream);
        return new Function<ByteBuffer, T>() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.5
            @Nullable
            public T apply(ByteBuffer byteBuffer) {
                byteBufferInputStream.reset(byteBuffer);
                try {
                    Schema schema2 = schemaCache.get(byteBuffer);
                    Preconditions.checkNotNull(schema2, "Fail to find source schema.");
                    return (T) reflectionDatumReader.read(binaryDecoder, schema2);
                } catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }

            public String toString() {
                return Objects.toStringHelper(this).add("dataType", typeToken).add("schema", schema).toString();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <S, T> Function<S, T> wrapInputDecoder(final BasicFlowletContext basicFlowletContext, QueueName queueName, final Function<S, T> function) {
        final String simpleName = queueName.getSimpleName();
        return new Function<S, T>() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.6
            public T apply(S s) {
                basicFlowletContext.getProgramMetrics().increment("process.events.in", 1, new String[]{simpleName});
                basicFlowletContext.getProgramMetrics().increment("process.tuples.read", 1, new String[]{simpleName});
                return (T) function.apply(s);
            }
        };
    }

    private SchemaCache createSchemaCache(Program program) throws Exception {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        Iterator<FlowSpecification> it = program.getSpecification().getFlows().values().iterator();
        while (it.hasNext()) {
            for (FlowletDefinition flowletDefinition : it.next().getFlowlets().values()) {
                builder.addAll(Iterables.concat(flowletDefinition.getInputs().values()));
                builder.addAll(Iterables.concat(flowletDefinition.getOutputs().values()));
            }
        }
        builder.add(this.schemaGenerator.generate(StreamEventData.class));
        return new SchemaCache(builder.build(), program.getClassLoader());
    }

    private Service createServiceHook(String str, Iterable<ConsumerSupplier<?>> iterable, AtomicReference<FlowletProgramController> atomicReference) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<ConsumerSupplier<?>> it = iterable.iterator();
        while (it.hasNext()) {
            QueueName queueName = it.next().getQueueName();
            if (queueName.isStream()) {
                newArrayList.add(queueName.getSimpleName());
            }
        }
        return newArrayList.isEmpty() ? new AbstractService() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramRunner.7
            protected void doStart() {
                notifyStarted();
            }

            protected void doStop() {
                notifyStopped();
            }
        } : new FlowletServiceHook(str, this.streamCoordinator, newArrayList, atomicReference);
    }
}
