package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.NonNull;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/PendingTopicFutures.class */
public class PendingTopicFutures {
    private final RequestStats requestStats;
    private CompletableFuture<TopicThrowablePair> currentTopicFuture;
    private final AtomicInteger count = new AtomicInteger(0);
    private final long enqueueTimestamp = MathUtils.nowInNano();

    public PendingTopicFutures(RequestStats requestStats) {
        this.requestStats = requestStats;
    }

    private void registerQueueLatency(boolean z) {
        if (this.requestStats != null) {
            if (z) {
                this.requestStats.getMessageQueuedLatencyStats().registerSuccessfulEvent(MathUtils.elapsedNanos(this.enqueueTimestamp), TimeUnit.NANOSECONDS);
            } else {
                this.requestStats.getMessageQueuedLatencyStats().registerFailedEvent(MathUtils.elapsedNanos(this.enqueueTimestamp), TimeUnit.NANOSECONDS);
            }
        }
    }

    public void addListener(CompletableFuture<Optional<PersistentTopic>> completableFuture, @NonNull Consumer<Optional<PersistentTopic>> consumer, @NonNull Consumer<Throwable> consumer2) {
        if (consumer == null) {
            throw new NullPointerException("persistentTopicConsumer is marked @NonNull but is null");
        }
        if (consumer2 == null) {
            throw new NullPointerException("exceptionConsumer is marked @NonNull but is null");
        }
        if (this.count.compareAndSet(0, 1)) {
            this.currentTopicFuture = completableFuture.thenApply(optional -> {
                registerQueueLatency(true);
                consumer.accept(optional);
                this.count.decrementAndGet();
                return TopicThrowablePair.withTopic(optional);
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                registerQueueLatency(false);
                consumer2.accept(th.getCause());
                this.count.decrementAndGet();
                return TopicThrowablePair.withThrowable(th.getCause());
            });
        } else {
            this.currentTopicFuture = this.currentTopicFuture.thenApply(topicThrowablePair -> {
                if (topicThrowablePair.getThrowable() == null) {
                    registerQueueLatency(true);
                    consumer.accept(topicThrowablePair.getPersistentTopicOpt());
                } else {
                    registerQueueLatency(false);
                    consumer2.accept(topicThrowablePair.getThrowable());
                }
                this.count.decrementAndGet();
                return topicThrowablePair;
            }).exceptionally((Function<Throwable, ? extends U>) th2 -> {
                registerQueueLatency(false);
                consumer2.accept(th2.getCause());
                this.count.decrementAndGet();
                return TopicThrowablePair.withThrowable(th2.getCause());
            });
            this.count.incrementAndGet();
        }
    }

    @VisibleForTesting
    public int waitAndGetSize() throws ExecutionException, InterruptedException {
        this.currentTopicFuture.get();
        return this.count.get();
    }

    @VisibleForTesting
    public int size() {
        return this.count.get();
    }
}
