package cz.seznam.euphoria.core.client.flow;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Datasets;
import cz.seznam.euphoria.core.client.functional.ExtractEventTime;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.AssignEventTime;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.util.Settings;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/seznam/euphoria/core/client/flow/Flow.class */
public class Flow implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(Flow.class);
    private final Settings settings;
    private final String name;
    private final Map<Operator<?, ?>, String> operatorNames = new HashMap();
    private final List<Operator<?, ?>> operators = new ArrayList();
    private final Set<Dataset<?>> outputs = new HashSet();
    private final Set<Dataset<?>> sources = new HashSet();
    private final Map<Dataset<?>, Set<Operator<?, ?>>> datasetConsumers = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/seznam/euphoria/core/client/flow/Flow$CountingOutputStream.class */
    public static class CountingOutputStream extends OutputStream {
        long count;

        CountingOutputStream() {
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            this.count++;
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            this.count += i2;
        }
    }

    protected Flow(@Nullable String str, Settings settings) {
        this.name = str == null ? "" : str;
        this.settings = cloneSettings(settings);
    }

    public static Flow create() {
        return create(null);
    }

    public static Flow create(@Nullable String str) {
        return new Flow(str, new Settings());
    }

    public static Flow create(String str, Settings settings) {
        return new Flow(str, settings);
    }

    public <IN, OUT, T extends Operator<IN, OUT>> T add(T t) {
        return (T) add(t, null);
    }

    public <T> void onPersisted(Dataset<T> dataset) {
    }

    <IN, OUT, T extends Operator<IN, OUT>> T add(T t, @Nullable String str) {
        this.operatorNames.put(t, buildOperatorName(t, str));
        this.operators.add(t);
        this.outputs.add(t.output());
        validateSerializable(t);
        for (Dataset<IN> dataset : t.listInputs()) {
            if (!this.sources.contains(dataset) && !this.outputs.contains(dataset)) {
                throw new IllegalArgumentException("Invalid input: All dependencies must already be present in the flow!");
            }
            Set<Operator<?, ?>> set = this.datasetConsumers.get(dataset);
            if (set == null) {
                set = new HashSet();
                this.datasetConsumers.put(dataset, set);
            }
            set.add(t);
        }
        return t;
    }

    private void validateSerializable(Operator operator) {
        try {
            CountingOutputStream countingOutputStream = new CountingOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(countingOutputStream);
            Throwable th = null;
            try {
                try {
                    objectOutputStream.writeObject(operator);
                    if (objectOutputStream != null) {
                        if (0 != 0) {
                            try {
                                objectOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            objectOutputStream.close();
                        }
                    }
                    LOG.debug("Serialized operator {} ({}) into {} bytes", new Object[]{operator.toString(), operator.getClass(), Long.valueOf(countingOutputStream.count)});
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new IllegalStateException("Operator " + operator + " not serializable!", e);
        }
    }

    private String buildOperatorName(Operator operator, @Nullable String str) {
        StringBuilder sb = new StringBuilder(64);
        sb.append(operator.getName()).append('@').append(this.operatorNames.size() + 1);
        String trimToNull = Util.trimToNull(str);
        if (trimToNull != null) {
            sb.append('#').append(trimToNull);
        }
        return sb.toString();
    }

    String getOperatorName(Operator operator) {
        return this.operatorNames.get(operator);
    }

    public Collection<Operator<?, ?>> operators() {
        return Collections.unmodifiableList(this.operators);
    }

    public Collection<Dataset<?>> sources() {
        return this.sources;
    }

    public int size() {
        return this.operators.size();
    }

    public Collection<Operator<?, ?>> getConsumersOf(Dataset<?> dataset) {
        Set<Operator<?, ?>> set = this.datasetConsumers.get(dataset);
        return set != null ? set : new ArrayList();
    }

    public String getName() {
        return this.name;
    }

    public String toString() {
        return "Flow{name='" + this.name + "', size=" + size() + '}';
    }

    public Settings getSettings() {
        return this.settings;
    }

    public <T> Dataset<T> createInput(DataSource<T> dataSource) {
        Dataset<T> createInputFromSource = Datasets.createInputFromSource(this, dataSource);
        this.sources.add(createInputFromSource);
        return createInputFromSource;
    }

    public <T> Dataset<T> createInput(DataSource<T> dataSource, ExtractEventTime<T> extractEventTime) {
        return AssignEventTime.of(createInput(dataSource)).using((ExtractEventTime) Objects.requireNonNull(extractEventTime)).output();
    }

    private Settings cloneSettings(Settings settings) {
        return new Settings(settings);
    }
}
