package cz.o2.proxima.source;

import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.ObserveHandle;
import cz.o2.proxima.storage.commitlog.Offset;
import cz.o2.proxima.storage.commitlog.Position;
import cz.seznam.euphoria.core.client.io.UnboundedDataSource;
import cz.seznam.euphoria.core.client.io.UnboundedPartition;
import cz.seznam.euphoria.core.client.io.UnboundedReader;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/source/UnboundedStreamSource.class */
public class UnboundedStreamSource implements UnboundedDataSource<StreamElement, Offset> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) UnboundedStreamSource.class);
    final CommitLogReader reader;
    final Position position;

    @Nullable
    final String consumer;

    public static UnboundedStreamSource of(CommitLogReader commitLogReader, Position position) {
        return new UnboundedStreamSource(null, commitLogReader, position);
    }

    public static UnboundedStreamSource of(@Nullable String str, CommitLogReader commitLogReader, Position position) {
        return new UnboundedStreamSource(str, commitLogReader, position);
    }

    UnboundedStreamSource(@Nullable String str, CommitLogReader commitLogReader, Position position) {
        this.consumer = str;
        this.reader = commitLogReader;
        this.position = position;
    }

    @Override // cz.seznam.euphoria.core.client.io.UnboundedDataSource
    public List<UnboundedPartition<StreamElement, Offset>> getPartitions() {
        return (List) this.reader.getPartitions().stream().map(this::asUnboundedPartition).collect(Collectors.toList());
    }

    private UnboundedPartition<StreamElement, Offset> asUnboundedPartition(Partition partition) {
        return () -> {
            final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
            final LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
            final AtomicReference atomicReference = new AtomicReference();
            atomicReference.set(this.reader.observeBulkPartitions(this.consumer, Arrays.asList(partition), this.position, partitionObserver(arrayBlockingQueue, linkedBlockingDeque)));
            return new UnboundedReader<StreamElement, Offset>() { // from class: cz.o2.proxima.source.UnboundedStreamSource.1

                @Nullable
                StreamElement current = null;

                @Override // java.io.Closeable, java.lang.AutoCloseable
                public void close() throws IOException {
                    ((ObserveHandle) atomicReference.get()).cancel();
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    try {
                        if (this.current != null) {
                            return true;
                        }
                        Optional optional = (Optional) arrayBlockingQueue.take();
                        if (!optional.isPresent()) {
                            return false;
                        }
                        this.current = (StreamElement) optional.get();
                        return true;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.current = null;
                        return false;
                    }
                }

                @Override // java.util.Iterator
                public StreamElement next() {
                    if (!hasNext()) {
                        throw new NoSuchElementException();
                    }
                    StreamElement streamElement = this.current;
                    this.current = null;
                    return streamElement;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // cz.seznam.euphoria.core.client.io.UnboundedReader
                public Offset getCurrentOffset() {
                    return ((ObserveHandle) atomicReference.get()).getCurrentOffsets().get(0);
                }

                @Override // cz.seznam.euphoria.core.client.io.UnboundedReader
                public void reset(Offset offset) {
                    ((ObserveHandle) atomicReference.get()).resetOffsets(Arrays.asList(offset));
                }

                @Override // cz.seznam.euphoria.core.client.io.UnboundedReader
                public void commitOffset(Offset offset) {
                    ArrayList arrayList = new ArrayList();
                    linkedBlockingDeque.drainTo(arrayList);
                    arrayList.forEach((v0) -> {
                        v0.confirm();
                    });
                }
            };
        };
    }

    private BulkLogObserver partitionObserver(final BlockingQueue<Optional<StreamElement>> blockingQueue, final BlockingQueue<BulkLogObserver.OffsetCommitter> blockingQueue2) {
        return new BulkLogObserver() { // from class: cz.o2.proxima.source.UnboundedStreamSource.2
            @Override // cz.o2.proxima.storage.commitlog.BulkLogObserver
            public boolean onNext(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                try {
                    return UnboundedStreamSource.this.enqueue(blockingQueue, blockingQueue2, offsetCommitter, streamElement);
                } catch (Exception e) {
                    offsetCommitter.fail(e);
                    throw new RuntimeException(e);
                }
            }

            @Override // cz.o2.proxima.storage.commitlog.LogObserverBase
            public boolean onError(Throwable th) {
                onCompleted();
                throw new RuntimeException(th);
            }

            @Override // cz.o2.proxima.storage.commitlog.LogObserverBase
            public void onCompleted() {
                try {
                    blockingQueue.put(Optional.empty());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean enqueue(BlockingQueue<Optional<StreamElement>> blockingQueue, BlockingQueue<BulkLogObserver.OffsetCommitter> blockingQueue2, BulkLogObserver.OffsetCommitter offsetCommitter, StreamElement streamElement) {
        try {
            blockingQueue2.add(offsetCommitter);
            blockingQueue.put(Optional.of(streamElement));
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            offsetCommitter.fail(e);
            return false;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 282727544:
                if (implMethodName.equals("lambda$asUnboundedPartition$7c6136a3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/io/UnboundedPartition") && serializedLambda.getFunctionalInterfaceMethodName().equals("openReader") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lcz/seznam/euphoria/core/client/io/UnboundedReader;") && serializedLambda.getImplClass().equals("cz/o2/proxima/source/UnboundedStreamSource") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/storage/Partition;)Lcz/seznam/euphoria/core/client/io/UnboundedReader;")) {
                    UnboundedStreamSource unboundedStreamSource = (UnboundedStreamSource) serializedLambda.getCapturedArg(0);
                    Partition partition = (Partition) serializedLambda.getCapturedArg(1);
                    return () -> {
                        final BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
                        final BlockingQueue linkedBlockingDeque = new LinkedBlockingDeque();
                        final AtomicReference atomicReference = new AtomicReference();
                        atomicReference.set(this.reader.observeBulkPartitions(this.consumer, Arrays.asList(partition), this.position, partitionObserver(arrayBlockingQueue, linkedBlockingDeque)));
                        return new UnboundedReader<StreamElement, Offset>() { // from class: cz.o2.proxima.source.UnboundedStreamSource.1

                            @Nullable
                            StreamElement current = null;

                            @Override // java.io.Closeable, java.lang.AutoCloseable
                            public void close() throws IOException {
                                ((ObserveHandle) atomicReference.get()).cancel();
                            }

                            @Override // java.util.Iterator
                            public boolean hasNext() {
                                try {
                                    if (this.current != null) {
                                        return true;
                                    }
                                    Optional optional = (Optional) arrayBlockingQueue.take();
                                    if (!optional.isPresent()) {
                                        return false;
                                    }
                                    this.current = (StreamElement) optional.get();
                                    return true;
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                    this.current = null;
                                    return false;
                                }
                            }

                            @Override // java.util.Iterator
                            public StreamElement next() {
                                if (!hasNext()) {
                                    throw new NoSuchElementException();
                                }
                                StreamElement streamElement = this.current;
                                this.current = null;
                                return streamElement;
                            }

                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // cz.seznam.euphoria.core.client.io.UnboundedReader
                            public Offset getCurrentOffset() {
                                return ((ObserveHandle) atomicReference.get()).getCurrentOffsets().get(0);
                            }

                            @Override // cz.seznam.euphoria.core.client.io.UnboundedReader
                            public void reset(Offset offset) {
                                ((ObserveHandle) atomicReference.get()).resetOffsets(Arrays.asList(offset));
                            }

                            @Override // cz.seznam.euphoria.core.client.io.UnboundedReader
                            public void commitOffset(Offset offset) {
                                ArrayList arrayList = new ArrayList();
                                linkedBlockingDeque.drainTo(arrayList);
                                arrayList.forEach((v0) -> {
                                    v0.confirm();
                                });
                            }
                        };
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
