package io.activej.datastream.processor;

import io.activej.async.AsyncAccumulator;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.promise.Promise;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/datastream/processor/StreamSorter.class */
public final class StreamSorter<K, T> implements StreamTransformer<T, T>, WithInitializer<StreamSorter<K, T>> {
    private static final Logger logger = LoggerFactory.getLogger(StreamSorter.class);
    private final AsyncAccumulator<? extends List<Integer>> temporaryStreamsAccumulator;
    private final StreamSorterStorage<T> storage;
    private final Function<T, K> keyFunction;
    private final Comparator<K> keyComparator;
    private final Comparator<T> itemComparator;
    private final boolean distinct;
    private final int itemsInMemory;
    private final StreamSorter<K, T>.Input input;
    private final StreamSupplier<T> output;
    private Executor sortingExecutor = (v0) -> {
        v0.run();
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamSorter$DistinctIterator.class */
    public static final class DistinctIterator<K, T> implements Iterator<T> {
        private final ArrayList<T> sortedList;
        private final Function<T, K> keyFunction;
        private final Comparator<K> keyComparator;
        int i;

        private DistinctIterator(ArrayList<T> arrayList, Function<T, K> function, Comparator<K> comparator) {
            this.i = 0;
            this.sortedList = arrayList;
            this.keyFunction = function;
            this.keyComparator = comparator;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.i < this.sortedList.size();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.Iterator
        public T next() {
            ArrayList<T> arrayList = this.sortedList;
            int i = this.i;
            this.i = i + 1;
            T t = arrayList.get(i);
            K apply = this.keyFunction.apply(t);
            while (this.i < this.sortedList.size() && this.keyComparator.compare(apply, this.keyFunction.apply(this.sortedList.get(this.i))) == 0) {
                this.i++;
            }
            return t;
        }
    }

    /* loaded from: input_file:io/activej/datastream/processor/StreamSorter$Input.class */
    private final class Input extends AbstractStreamConsumer<T> implements StreamDataAcceptor<T> {
        private final List<Integer> partitionIds;
        private ArrayList<T> list;
        private Promise<Void> cleanupPromise;

        private Input(List<Integer> list) {
            this.list = new ArrayList<>();
            this.partitionIds = list;
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            resume(this);
        }

        @Override // io.activej.datastream.StreamDataAcceptor
        public void accept(T t) {
            this.list.add(t);
            if (this.list.size() < StreamSorter.this.itemsInMemory) {
                return;
            }
            ArrayList<T> arrayList = this.list;
            this.list = new ArrayList<>(StreamSorter.this.itemsInMemory);
            StreamSorter.this.temporaryStreamsAccumulator.addPromise(Promise.ofBlocking(StreamSorter.this.sortingExecutor, () -> {
                arrayList.sort(StreamSorter.this.itemComparator);
            }).then(r9 -> {
                Iterator distinctIterator = StreamSorter.this.distinct ? new DistinctIterator(arrayList, StreamSorter.this.keyFunction, StreamSorter.this.keyComparator) : arrayList.iterator();
                return StreamSorter.this.storage.newPartitionId().then(num -> {
                    return StreamSorter.this.storage.write(num.intValue()).then(streamConsumer -> {
                        return StreamSupplier.ofIterator(distinctIterator).streamTo(streamConsumer);
                    }).map(r3 -> {
                        return num;
                    });
                });
            }).whenResult(this::suspendOrResume).whenException(this::closeEx), (v0, v1) -> {
                v0.add(v1);
            });
            suspendOrResume();
        }

        private void suspendOrResume() {
            if (StreamSorter.this.temporaryStreamsAccumulator.getActivePromises() > 2) {
                suspend();
            } else {
                resume(this);
            }
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamSorter.this.temporaryStreamsAccumulator.run();
            StreamSorter.this.output.getAcknowledgement().then((r5, exc) -> {
                return cleanup().then((r5, exc) -> {
                    return Promise.of(r5, exc);
                });
            }).whenResult(this::acknowledge).whenException(this::closeEx);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onError(Exception exc) {
            StreamSorter.this.temporaryStreamsAccumulator.closeEx(exc);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onCleanup() {
            this.list = null;
            cleanup();
        }

        private Promise<Void> cleanup() {
            if (this.cleanupPromise != null) {
                return this.cleanupPromise;
            }
            Promise<Void> complete = this.partitionIds.isEmpty() ? Promise.complete() : StreamSorter.this.storage.cleanup(this.partitionIds);
            this.cleanupPromise = complete;
            return complete;
        }
    }

    private StreamSorter(StreamSorterStorage<T> streamSorterStorage, Function<T, K> function, Comparator<K> comparator, boolean z, int i) {
        this.storage = streamSorterStorage;
        this.keyFunction = function;
        this.keyComparator = comparator;
        this.itemComparator = (obj, obj2) -> {
            return comparator.compare(function.apply(obj), function.apply(obj2));
        };
        this.distinct = z;
        this.itemsInMemory = i;
        ArrayList arrayList = new ArrayList();
        this.input = new Input(arrayList);
        AsyncAccumulator<? extends List<Integer>> create = AsyncAccumulator.create(arrayList);
        this.temporaryStreamsAccumulator = create;
        this.output = StreamSupplier.ofPromise(create.get().then(list -> {
            ArrayList arrayList2 = ((Input) this.input).list;
            ((Input) this.input).list = null;
            return Promise.ofBlocking(this.sortingExecutor, () -> {
                arrayList2.sort(this.itemComparator);
            }).map(r13 -> {
                StreamSupplier ofIterator = StreamSupplier.ofIterator(z ? new DistinctIterator(arrayList2, function, comparator) : arrayList2.iterator());
                logger.info("Items in memory: {}, files: {}", Integer.valueOf(arrayList2.size()), Integer.valueOf(list.size()));
                if (list.isEmpty()) {
                    return ofIterator;
                }
                StreamReducer create2 = StreamReducer.create(comparator);
                ofIterator.streamTo(create2.newInput(function, z ? StreamReducers.deduplicateReducer() : StreamReducers.mergeReducer()));
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    StreamSupplier.ofPromise(streamSorterStorage.read(((Integer) it.next()).intValue())).streamTo(create2.newInput(function, z ? StreamReducers.deduplicateReducer() : StreamReducers.mergeReducer()));
                }
                return create2.getOutput();
            });
        }));
    }

    public StreamSorter<K, T> withSortingExecutor(Executor executor) {
        this.sortingExecutor = executor;
        return this;
    }

    public static <K, T> StreamSorter<K, T> create(StreamSorterStorage<T> streamSorterStorage, Function<T, K> function, Comparator<K> comparator, boolean z, int i) {
        return new StreamSorter<>(streamSorterStorage, function, comparator, z, i);
    }

    @Override // io.activej.datastream.dsl.HasStreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }
}
