package io.eventuate.local.common;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/local/common/OffsetProcessor.class */
public class OffsetProcessor<OFFSET> {
    protected GenericOffsetStore<OFFSET> offsetStore;
    protected Consumer<Exception> offsetSavingExceptionHandler;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    private AtomicBoolean processingOffsets = new AtomicBoolean(false);
    protected ConcurrentCountedLinkedQueue<Optional<OFFSET>> offsets = new ConcurrentCountedLinkedQueue<>();
    private Executor executor = Executors.newCachedThreadPool();

    public OffsetProcessor(GenericOffsetStore<OFFSET> genericOffsetStore, Consumer<Exception> consumer) {
        this.offsetStore = genericOffsetStore;
        this.offsetSavingExceptionHandler = consumer;
    }

    public void saveOffset(CompletableFuture<Optional<OFFSET>> completableFuture) {
        this.offsets.add(completableFuture);
        completableFuture.whenCompleteAsync(this::processOffsetsWithExceptionHandling, this.executor);
    }

    private void processOffsetsWithExceptionHandling(Optional<OFFSET> optional, Throwable th) {
        try {
            processOffsets();
        } catch (Exception e) {
            this.offsetSavingExceptionHandler.accept(e);
        }
    }

    private void processOffsets() {
        while (this.processingOffsets.compareAndSet(false, true)) {
            collectAndSaveOffsets();
            this.processingOffsets.set(false);
            if (!isDone(this.offsets.peek())) {
                return;
            }
        }
    }

    protected void collectAndSaveOffsets() {
        Optional<OFFSET> empty = Optional.empty();
        while (isDone(this.offsets.peek())) {
            Optional<OFFSET> offset = getOffset(this.offsets.poll());
            if (offset.isPresent()) {
                empty = offset;
            }
        }
        GenericOffsetStore<OFFSET> genericOffsetStore = this.offsetStore;
        genericOffsetStore.getClass();
        empty.ifPresent(genericOffsetStore::save);
    }

    protected Optional<OFFSET> getOffset(CompletableFuture<Optional<OFFSET>> completableFuture) {
        return (Optional) CompletableFutureUtil.get(completableFuture);
    }

    protected boolean isDone(CompletableFuture<Optional<OFFSET>> completableFuture) {
        return completableFuture != null && completableFuture.isDone();
    }

    public AtomicInteger getUnprocessedOffsetCount() {
        return this.offsets.size;
    }
}
