package io.activej.datastream;

import io.activej.common.Utils;
import io.activej.csp.ChannelSupplier;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/datastream/StreamSuppliers.class */
final class StreamSuppliers {

    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$Closing.class */
    static final class Closing<T> extends AbstractStreamSupplier<T> {
        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onInit() {
            sendEndOfStream();
        }
    }

    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$ClosingWithError.class */
    static final class ClosingWithError<T> extends AbstractStreamSupplier<T> {
        private Exception error;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ClosingWithError(Exception exc) {
            this.error = exc;
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onInit() {
            this.error = (Exception) Utils.nullify(this.error, this::closeEx);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$Concat.class */
    public static final class Concat<T> extends AbstractStreamSupplier<T> {
        private ChannelSupplier<StreamSupplier<T>> iterator;
        private Concat<T>.InternalConsumer internalConsumer = new InternalConsumer();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/activej/datastream/StreamSuppliers$Concat$InternalConsumer.class */
        public class InternalConsumer extends AbstractStreamConsumer<T> {
            private InternalConsumer() {
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Concat(ChannelSupplier<StreamSupplier<T>> channelSupplier) {
            this.iterator = channelSupplier;
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onStarted() {
            next();
        }

        private void next() {
            this.internalConsumer.acknowledge();
            this.internalConsumer = new InternalConsumer();
            resume();
            this.iterator.get().whenResult(streamSupplier -> {
                if (streamSupplier == null) {
                    sendEndOfStream();
                } else {
                    streamSupplier.getEndOfStream().whenResult(this::next).whenException(this::closeEx);
                    streamSupplier.streamTo(this.internalConsumer);
                }
            }).whenException(this::closeEx);
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            this.internalConsumer.resume(getDataAcceptor());
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            this.internalConsumer.suspend();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onError(Exception exc) {
            this.internalConsumer.closeEx(exc);
            this.iterator.closeEx(exc);
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onCleanup() {
            this.iterator = null;
        }
    }

    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$Idle.class */
    static final class Idle<T> extends AbstractStreamSupplier<T> {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$OfAnotherEventloop.class */
    public static final class OfAnotherEventloop<T> extends AbstractStreamSupplier<T> {
        private static final int MAX_BUFFER_SIZE = 100;
        private static final Iterator<?> END_OF_STREAM = Collections.emptyIterator();
        private volatile Iterator<T> iterator;
        private volatile boolean isReady;
        private final StreamSupplier<T> anotherEventloopSupplier;
        private final OfAnotherEventloop<T>.InternalConsumer internalConsumer;
        private volatile boolean wakingUp;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/activej/datastream/StreamSuppliers$OfAnotherEventloop$InternalConsumer.class */
        public final class InternalConsumer extends AbstractStreamConsumer<T> {
            private List<T> list = new ArrayList();
            private final StreamDataAcceptor<T> toList = obj -> {
                this.list.add(obj);
                if (this.list.size() == OfAnotherEventloop.MAX_BUFFER_SIZE) {
                    flush();
                    suspend();
                }
            };
            volatile boolean wakingUp;

            InternalConsumer() {
            }

            void execute(Runnable runnable) {
                this.eventloop.execute(runnable);
            }

            void wakeUp() {
                if (this.wakingUp) {
                    return;
                }
                this.wakingUp = true;
                execute(this::onWakeUp);
            }

            void onWakeUp() {
                if (isComplete()) {
                    return;
                }
                this.wakingUp = false;
                flush();
                if (OfAnotherEventloop.this.isReady) {
                    resume(this.toList);
                } else {
                    suspend();
                }
            }

            @Override // io.activej.datastream.AbstractStreamConsumer
            protected void onInit() {
                this.eventloop.startExternalTask();
            }

            @Override // io.activej.datastream.AbstractStreamConsumer
            protected void onEndOfStream() {
                flush();
            }

            private void flush() {
                if (OfAnotherEventloop.this.iterator != null) {
                    return;
                }
                if (isEndOfStream() && this.list.isEmpty()) {
                    OfAnotherEventloop.this.iterator = OfAnotherEventloop.END_OF_STREAM;
                } else {
                    if (this.list.isEmpty()) {
                        return;
                    }
                    OfAnotherEventloop.this.iterator = this.list.iterator();
                    this.list = new ArrayList();
                }
                OfAnotherEventloop.this.wakeUp();
            }

            @Override // io.activej.datastream.AbstractStreamConsumer
            protected void onError(Exception exc) {
                OfAnotherEventloop.this.execute(() -> {
                    OfAnotherEventloop.this.closeEx(exc);
                });
            }

            @Override // io.activej.datastream.AbstractStreamConsumer
            protected void onComplete() {
                this.eventloop.completeExternalTask();
            }

            @Override // io.activej.datastream.AbstractStreamConsumer
            protected void onCleanup() {
                this.list = null;
            }
        }

        public OfAnotherEventloop(@NotNull Eventloop eventloop, @NotNull StreamSupplier<T> streamSupplier) {
            this.anotherEventloopSupplier = streamSupplier;
            this.internalConsumer = (InternalConsumer) Eventloop.initWithEventloop(eventloop, () -> {
                return new InternalConsumer();
            });
        }

        void execute(Runnable runnable) {
            this.eventloop.execute(runnable);
        }

        void wakeUp() {
            if (this.wakingUp) {
                return;
            }
            this.wakingUp = true;
            execute(this::onWakeUp);
        }

        void onWakeUp() {
            if (isComplete()) {
                return;
            }
            this.wakingUp = false;
            flush();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onInit() {
            this.eventloop.startExternalTask();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onStarted() {
            this.internalConsumer.execute(() -> {
                this.anotherEventloopSupplier.streamTo(this.internalConsumer);
            });
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            this.isReady = true;
            flush();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            this.isReady = false;
            this.internalConsumer.wakeUp();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onComplete() {
            this.eventloop.completeExternalTask();
        }

        private void flush() {
            if (this.iterator == null) {
                this.internalConsumer.wakeUp();
                return;
            }
            Iterator<T> it = this.iterator;
            while (isReady() && it.hasNext()) {
                send(it.next());
            }
            if (it == END_OF_STREAM) {
                sendEndOfStream();
            } else {
                if (it.hasNext()) {
                    return;
                }
                this.iterator = null;
                this.internalConsumer.wakeUp();
            }
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onAcknowledge() {
            OfAnotherEventloop<T>.InternalConsumer internalConsumer = this.internalConsumer;
            OfAnotherEventloop<T>.InternalConsumer internalConsumer2 = this.internalConsumer;
            Objects.requireNonNull(internalConsumer2);
            internalConsumer.execute(internalConsumer2::acknowledge);
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onError(Exception exc) {
            this.internalConsumer.execute(() -> {
                this.internalConsumer.closeEx(exc);
            });
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onCleanup() {
            this.iterator = null;
        }
    }

    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$OfChannelSupplier.class */
    static final class OfChannelSupplier<T> extends AbstractStreamSupplier<T> {
        private final ChannelSupplier<T> supplier;

        public OfChannelSupplier(ChannelSupplier<T> channelSupplier) {
            this.supplier = channelSupplier;
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            asyncBegin();
            this.supplier.get().run((obj, exc) -> {
                if (exc != null) {
                    closeEx(exc);
                } else if (obj == 0) {
                    sendEndOfStream();
                } else {
                    send(obj);
                    asyncResume();
                }
            });
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onError(Exception exc) {
            this.supplier.closeEx(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$OfIterator.class */
    public static final class OfIterator<T> extends AbstractStreamSupplier<T> {
        private final Iterator<T> iterator;

        public OfIterator(@NotNull Iterator<T> it) {
            this.iterator = it;
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            while (isReady() && this.iterator.hasNext()) {
                send(this.iterator.next());
            }
            if (this.iterator.hasNext()) {
                return;
            }
            sendEndOfStream();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/datastream/StreamSuppliers$OfPromise.class */
    public static final class OfPromise<T> extends AbstractStreamSupplier<T> {
        private Promise<? extends StreamSupplier<T>> promise;
        private final OfPromise<T>.InternalConsumer internalConsumer = new InternalConsumer();

        /* loaded from: input_file:io/activej/datastream/StreamSuppliers$OfPromise$InternalConsumer.class */
        private class InternalConsumer extends AbstractStreamConsumer<T> {
            private InternalConsumer() {
            }
        }

        public OfPromise(Promise<? extends StreamSupplier<T>> promise) {
            this.promise = promise;
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onInit() {
            this.promise.whenResult(streamSupplier -> {
                getEndOfStream().whenException(exc -> {
                    streamSupplier.closeEx(exc);
                });
                streamSupplier.getEndOfStream().whenResult(this::sendEndOfStream).whenException(this::closeEx);
                streamSupplier.streamTo(this.internalConsumer);
            }).whenException(this::closeEx);
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            this.internalConsumer.resume(getDataAcceptor());
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            this.internalConsumer.suspend();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onAcknowledge() {
            this.internalConsumer.acknowledge();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onCleanup() {
            this.promise = null;
        }
    }

    StreamSuppliers() {
    }
}
