package edu.stanford.futuredata.macrobase.operator;

import edu.stanford.futuredata.macrobase.datamodel.DataFrame;
import edu.stanford.futuredata.macrobase.util.ArrayUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:edu/stanford/futuredata/macrobase/operator/WindowedOperator.class */
public class WindowedOperator<O> implements Operator<DataFrame, O> {
    private String timeColumn = "time";
    private double windowLength = 60.0d;
    private double slideLength = 10.0d;
    private double maxWindowTime;
    private IncrementalOperator<O> op;
    private ArrayList<DataFrame> batchBuffer;

    public WindowedOperator(IncrementalOperator incrementalOperator) {
        this.op = incrementalOperator;
    }

    public WindowedOperator<O> initialize() {
        this.maxWindowTime = 0.0d;
        this.batchBuffer = new ArrayList<>();
        this.op.setWindowSize((int) Math.ceil(this.windowLength / this.slideLength));
        return this;
    }

    @Override // edu.stanford.futuredata.macrobase.operator.Operator
    public void process(DataFrame dataFrame) throws Exception {
        Iterator<DataFrame> it = addToBuffer(dataFrame).iterator();
        while (it.hasNext()) {
            this.op.process(it.next());
        }
    }

    public double flushBuffer() throws Exception {
        DataFrame unionAll = DataFrame.unionAll(this.batchBuffer);
        this.maxWindowTime += this.slideLength;
        this.op.process(unionAll);
        this.batchBuffer.clear();
        return this.maxWindowTime;
    }

    protected List<DataFrame> addToBuffer(DataFrame dataFrame) {
        if (dataFrame.getNumRows() == 0) {
            return Collections.emptyList();
        }
        double max = ArrayUtils.max(dataFrame.getDoubleColumnByName(this.timeColumn));
        DataFrame dataFrame2 = dataFrame;
        ArrayList arrayList = new ArrayList(1);
        while (max >= this.maxWindowTime + this.slideLength) {
            double d = this.maxWindowTime + this.slideLength;
            DataFrame filter = dataFrame2.filter(this.timeColumn, d2 -> {
                return d2 < d;
            });
            dataFrame2 = dataFrame2.filter(this.timeColumn, d3 -> {
                return d3 >= d;
            });
            this.batchBuffer.add(filter);
            DataFrame unionAll = DataFrame.unionAll(this.batchBuffer);
            this.maxWindowTime = d;
            this.batchBuffer.clear();
            arrayList.add(unionAll);
        }
        this.batchBuffer.add(dataFrame2);
        return arrayList;
    }

    @Override // edu.stanford.futuredata.macrobase.operator.Operator
    public O getResults() {
        return this.op.getResults();
    }

    public String getTimeColumn() {
        return this.timeColumn;
    }

    public void setTimeColumn(String str) {
        this.timeColumn = str;
    }

    public double getWindowLength() {
        return this.windowLength;
    }

    public void setWindowLength(double d) {
        this.windowLength = d;
    }

    public double getSlideLength() {
        return this.slideLength;
    }

    public void setSlideLength(double d) {
        this.slideLength = d;
    }

    public double getMaxWindowTime() {
        return this.maxWindowTime;
    }

    public int getBufferSize() {
        return this.batchBuffer.size();
    }

    public int getNumBufferedRows() {
        int i = 0;
        Iterator<DataFrame> it = this.batchBuffer.iterator();
        while (it.hasNext()) {
            i += it.next().getNumRows();
        }
        return i;
    }
}
