package cz.seznam.euphoria.beam;

import avro.shaded.com.google.common.collect.Iterables;
import cz.seznam.euphoria.beam.io.BeamWriteSink;
import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.core.client.accumulators.VoidAccumulatorProvider;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.graph.Node;
import cz.seznam.euphoria.core.util.Settings;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.values.PCollection;

/* loaded from: input_file:cz/seznam/euphoria/beam/BeamFlow.class */
public class BeamFlow extends Flow {
    private final transient Map<PCollection<?>, Dataset<?>> wrapped;
    private Duration allowedLateness;
    private AccumulatorProvider.Factory accumulatorFactory;
    private final transient BeamExecutorContext context;
    private final transient Pipeline pipeline;

    public static BeamFlow create(Pipeline pipeline) {
        return new BeamFlow(null, pipeline);
    }

    public static BeamFlow create(String str, Pipeline pipeline) {
        return new BeamFlow(str, pipeline);
    }

    private BeamFlow(String str, Pipeline pipeline) {
        super(str, new Settings());
        this.wrapped = new HashMap();
        this.allowedLateness = Duration.ZERO;
        this.accumulatorFactory = VoidAccumulatorProvider.getFactory();
        this.pipeline = pipeline;
        this.context = new BeamExecutorContext(DAG.empty(), this.accumulatorFactory, pipeline, getSettings(), org.joda.time.Duration.millis(this.allowedLateness.toMillis()));
    }

    @Override // cz.seznam.euphoria.core.client.flow.Flow
    public <T> Dataset<T> createInput(DataSource<T> dataSource) {
        Dataset<T> createInput = super.createInput(dataSource);
        this.context.setPCollection(createInput, InputTranslator.doTranslate(dataSource, this.context));
        return createInput;
    }

    public BeamFlow setAccumulatorProvider(AccumulatorProvider.Factory factory) {
        this.accumulatorFactory = factory;
        return this;
    }

    public Pipeline asPipeline(PipelineOptions pipelineOptions) {
        return FlowTranslator.toPipeline(this, this.accumulatorFactory, pipelineOptions, getSettings(), org.joda.time.Duration.millis(this.allowedLateness.toMillis()));
    }

    public <T> Dataset<T> wrapped(PCollection<T> pCollection) {
        return (Dataset) this.wrapped.compute(pCollection, (pCollection2, dataset) -> {
            return dataset == null ? newDataset(pCollection) : dataset;
        });
    }

    public <T> PCollection<T> unwrapped(Dataset<T> dataset) {
        Operator<?, T> producer = dataset.getProducer();
        if (producer != null) {
            dataset = producer.output();
        }
        Dataset<T> dataset2 = dataset;
        return this.context.getPCollection(dataset2).orElseThrow(() -> {
            return new IllegalArgumentException("Dataset " + dataset2 + " was not created by this flow!");
        });
    }

    private <T> Dataset<T> newDataset(PCollection<T> pCollection) {
        WrappedPCollectionOperator wrappedPCollectionOperator = new WrappedPCollectionOperator(this, pCollection);
        add(wrappedPCollectionOperator);
        return wrappedPCollectionOperator.output();
    }

    @Override // cz.seznam.euphoria.core.client.flow.Flow
    public <IN, OUT, T extends Operator<IN, OUT>> T add(T t) {
        DAG of;
        T t2 = (T) super.add(t);
        List list = (List) t.listInputs().stream().map(dataset -> {
            return new WrappedPCollectionOperator(this, unwrapped(dataset), dataset);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            of = DAG.of(t);
        } else {
            of = DAG.of(list);
            of.add((DAG) t, (List<DAG>) list);
        }
        DAG<Operator<?, ?>> unfold = FlowTranslator.unfold(of);
        this.context.setTranslationDAG(unfold);
        FlowTranslator.updateContextBy(unfold, this.context);
        Dataset<OUT> output = t.output();
        Dataset<OUT> output2 = ((Operator) ((Node) Iterables.getOnlyElement(unfold.getLeafs())).get()).output();
        if (output != output2) {
            this.context.setPCollection(output, unwrapped(output2));
        }
        return t2;
    }

    @Override // cz.seznam.euphoria.core.client.flow.Flow
    public <T> void onPersisted(Dataset<T> dataset) {
        if (this.pipeline != null) {
            this.context.getPCollection(dataset).orElseThrow(() -> {
                return new IllegalStateException("Persisting dataset not created by this flow! Fix code!");
            }).apply(BeamWriteSink.wrap(dataset.getOutputSink()));
        }
    }

    public BeamFlow withAllowedLateness(Duration duration) {
        this.allowedLateness = duration;
        return this;
    }

    public Pipeline getPipeline() {
        return (Pipeline) Objects.requireNonNull(this.pipeline);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasPipeline() {
        return this.pipeline != null;
    }
}
