package cz.o2.proxima.source;

import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.batch.BatchLogObservable;
import cz.o2.proxima.storage.batch.BatchLogObserver;
import cz.seznam.euphoria.core.client.io.BoundedDataSource;
import cz.seznam.euphoria.core.client.io.BoundedReader;
import cz.seznam.euphoria.core.client.io.UnsplittableBoundedSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.server.quorum.QuorumStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/source/BatchSource.class */
public class BatchSource implements BoundedDataSource<StreamElement> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BatchSource.class);
    private final BatchLogObservable observable;
    private final List<AttributeDescriptor<?>> attributes;
    private final long startStamp;
    private final long endStamp;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/source/BatchSource$Observer.class */
    public static class Observer implements BatchLogObserver {
        BlockingQueue<Optional<StreamElement>> queue;
        boolean stop;

        private Observer() {
            this.queue = new SynchronousQueue();
            this.stop = false;
        }

        @Override // cz.o2.proxima.storage.batch.BatchLogObserver
        public boolean onNext(StreamElement streamElement) {
            try {
                this.queue.put(Optional.of(streamElement));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                BatchSource.log.warn("Interrupted while forwarding element to queue.");
            }
            return !this.stop;
        }

        public void stop() {
            this.stop = true;
        }

        @Override // cz.o2.proxima.storage.batch.BatchLogObserver
        public void onCompleted() {
            try {
                this.queue.put(Optional.empty());
            } catch (InterruptedException e) {
                BatchSource.log.warn("Interrupted while forwarding EOS.");
                Thread.currentThread().interrupt();
            }
        }

        @Override // cz.o2.proxima.storage.batch.BatchLogObserver
        public void onError(Throwable th) {
            throw new RuntimeException(th);
        }

        public BlockingQueue<Optional<StreamElement>> getQueue() {
            return this.queue;
        }
    }

    public static BatchSource of(BatchLogObservable batchLogObservable, List<AttributeDescriptor<?>> list, long j, long j2) {
        return new BatchSource(batchLogObservable, list, j, j2);
    }

    public static BatchSource of(BatchLogObservable batchLogObservable, AttributeFamilyDescriptor attributeFamilyDescriptor, long j, long j2) {
        return new BatchSource(batchLogObservable, attributeFamilyDescriptor.getAttributes(), j, j2);
    }

    private BatchSource(BatchLogObservable batchLogObservable, List<AttributeDescriptor<?>> list, long j, long j2) {
        this.observable = batchLogObservable;
        this.attributes = list;
        this.startStamp = j;
        this.endStamp = j2;
    }

    @Override // cz.seznam.euphoria.core.client.io.BoundedDataSource
    public Set<String> getLocations() {
        return Collections.singleton(QuorumStats.Provider.UNKNOWN_STATE);
    }

    @Override // cz.seznam.euphoria.core.client.io.BoundedDataSource
    public BoundedReader<StreamElement> openReader() throws IOException {
        throw new UnsupportedOperationException("Not supported. Call `split` first.");
    }

    @Override // cz.seznam.euphoria.core.client.io.BoundedDataSource
    public List<BoundedDataSource<StreamElement>> split(long j) {
        return (List) this.observable.getPartitions(this.startStamp, this.endStamp).stream().map(partition -> {
            return new UnsplittableBoundedSource<StreamElement>() { // from class: cz.o2.proxima.source.BatchSource.1
                @Override // cz.seznam.euphoria.core.client.io.BoundedDataSource
                public Set<String> getLocations() {
                    return Collections.singleton(QuorumStats.Provider.UNKNOWN_STATE);
                }

                @Override // cz.seznam.euphoria.core.client.io.BoundedDataSource
                public BoundedReader<StreamElement> openReader() throws IOException {
                    final Observer observer = new Observer();
                    BatchSource.this.observable.observe(Arrays.asList(partition), BatchSource.this.attributes, observer);
                    return new BoundedReader<StreamElement>() { // from class: cz.o2.proxima.source.BatchSource.1.1

                        @Nullable
                        StreamElement current = null;

                        @Override // java.io.Closeable, java.lang.AutoCloseable
                        public void close() throws IOException {
                            observer.stop();
                        }

                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            try {
                            } catch (InterruptedException e) {
                                BatchSource.log.warn("Interrupted while trying to retrieve next element.");
                                Thread.currentThread().interrupt();
                                this.current = null;
                            }
                            if (this.current != null) {
                                return true;
                            }
                            this.current = observer.getQueue().take().orElse(null);
                            return this.current != null;
                        }

                        @Override // java.util.Iterator
                        public StreamElement next() {
                            if (!hasNext()) {
                                throw new NoSuchElementException();
                            }
                            StreamElement streamElement = this.current;
                            this.current = null;
                            return streamElement;
                        }
                    };
                }
            };
        }).collect(Collectors.toList());
    }
}
