package cz.o2.proxima.storage.pubsub.io;

import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.ObserveHandle;
import cz.o2.proxima.storage.commitlog.Offset;
import cz.o2.proxima.storage.pubsub.AttributeData;
import cz.o2.proxima.util.Pair;
import cz.seznam.euphoria.beam.io.KryoCoder;
import cz.seznam.euphoria.core.annotation.stability.Experimental;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.DateTimeConstants;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:cz/o2/proxima/storage/pubsub/io/CommitLogSource.class */
public class CommitLogSource extends UnboundedSource<AttributeData, WatermarkCommitCheckpoint> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CommitLogSource.class);
    private final String name;
    private final CommitLogReader reader;
    private final long allowedLatenessMs;
    private final BlockingQueue<Pair<BulkLogObserver.OffsetCommitter, AttributeData>> batch;

    /* loaded from: input_file:cz/o2/proxima/storage/pubsub/io/CommitLogSource$WatermarkCommitCheckpoint.class */
    public static class WatermarkCommitCheckpoint implements UnboundedSource.CheckpointMark {
        private final long watermark;

        static WatermarkCommitCheckpoint of(long j) {
            return new WatermarkCommitCheckpoint(j);
        }

        WatermarkCommitCheckpoint(long j) {
            this.watermark = j;
        }

        public void finalizeCheckpoint() throws IOException {
        }

        public long getWatermark() {
            return this.watermark;
        }
    }

    public static CommitLogSource of(CommitLogReader commitLogReader, String str, long j) {
        return new CommitLogSource(commitLogReader, str, j);
    }

    private CommitLogSource(CommitLogReader commitLogReader, String str, long j) {
        this.reader = commitLogReader;
        this.name = str;
        this.allowedLatenessMs = j;
        this.batch = new ArrayBlockingQueue(DateTimeConstants.MILLIS_PER_SECOND);
    }

    private CommitLogSource(CommitLogSource commitLogSource) {
        this(commitLogSource.reader, commitLogSource.name, commitLogSource.allowedLatenessMs);
    }

    public List<? extends UnboundedSource<AttributeData, WatermarkCommitCheckpoint>> split(int i, PipelineOptions pipelineOptions) throws Exception {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new CommitLogSource(this);
        }).collect(Collectors.toList());
    }

    public UnboundedSource.UnboundedReader<AttributeData> createReader(PipelineOptions pipelineOptions, WatermarkCommitCheckpoint watermarkCommitCheckpoint) throws IOException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicLong atomicLong = new AtomicLong(watermarkCommitCheckpoint == null ? Long.MIN_VALUE : watermarkCommitCheckpoint.getWatermark());
        log.info("Creating reader from watermark {}", atomicLong);
        final ObserveHandle observeBulk = this.reader.observeBulk(this.name, new BulkLogObserver() { // from class: cz.o2.proxima.storage.pubsub.io.CommitLogSource.1
            public boolean onNext(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                try {
                    if (CommitLogSource.this.batch.offer(Pair.of(offsetCommitter, CommitLogSource.toData(streamElement)), 200L, TimeUnit.MILLISECONDS)) {
                        return true;
                    }
                    CommitLogSource.log.warn("Nacking incoming element {} due to write timeout.", streamElement);
                    offsetCommitter.nack();
                    return true;
                } catch (InterruptedException e) {
                    CommitLogSource.log.warn("Interrupted while inserting element into queue.", (Throwable) e);
                    Thread.currentThread().interrupt();
                    return false;
                }
            }

            public void onRestart(List<Offset> list) {
                atomicBoolean.set(true);
                CommitLogSource.log.info("Successfully initialized bulk observer from watermark {}", Long.valueOf(atomicLong.get()));
            }

            public boolean onError(Throwable th) {
                CommitLogSource.log.error("Error during observing commit log", th);
                throw new RuntimeException(th);
            }
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        return new UnboundedSource.UnboundedReader<AttributeData>() { // from class: cz.o2.proxima.storage.pubsub.io.CommitLogSource.2
            AttributeData current;
            BulkLogObserver.OffsetCommitter storedCommitter = null;

            public boolean start() throws IOException {
                return advance();
            }

            public boolean advance() throws IOException {
                try {
                    Pair pair = (Pair) CommitLogSource.this.batch.poll(10L, TimeUnit.MILLISECONDS);
                    if (pair != null) {
                        this.storedCommitter = (BulkLogObserver.OffsetCommitter) pair.getFirst();
                        this.current = (AttributeData) pair.getSecond();
                        atomicInteger.set(0);
                        return true;
                    }
                    if (atomicBoolean.get() && atomicInteger.updateAndGet(i -> {
                        int i = i + 1;
                        if (i >= 20) {
                            return 0;
                        }
                        return i;
                    }) == 0) {
                        atomicLong.set(System.currentTimeMillis() - CommitLogSource.this.allowedLatenessMs);
                    }
                    this.current = null;
                    return false;
                } catch (InterruptedException e) {
                    CommitLogSource.log.warn("Interrupted while polling queue", (Throwable) e);
                    Thread.currentThread().interrupt();
                    return false;
                }
            }

            public Instant getWatermark() {
                return new Instant(atomicLong.get());
            }

            public UnboundedSource.CheckpointMark getCheckpointMark() {
                if (this.storedCommitter != null) {
                    this.storedCommitter.confirm();
                    this.storedCommitter = null;
                }
                return WatermarkCommitCheckpoint.of(atomicLong.get());
            }

            /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
            public UnboundedSource<AttributeData, ?> m855getCurrentSource() {
                return CommitLogSource.this;
            }

            /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
            public AttributeData m856getCurrent() {
                return this.current;
            }

            public Instant getCurrentTimestamp() {
                return this.current == null ? new Instant(Long.MIN_VALUE) : new Instant(this.current.getStamp());
            }

            public void close() throws IOException {
                CommitLogSource.log.info("Closing handle {}", observeBulk);
                observeBulk.cancel();
            }
        };
    }

    public Coder<WatermarkCommitCheckpoint> getCheckpointMarkCoder() {
        return new KryoCoder();
    }

    public Coder<AttributeData> getOutputCoder() {
        return new KryoCoder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AttributeData toData(StreamElement streamElement) {
        return new AttributeData(streamElement.getKey(), streamElement.getAttribute(), streamElement.getValue(), streamElement.isDelete() && !streamElement.isDeleteWildcard(), streamElement.isDeleteWildcard(), streamElement.getStamp());
    }
}
