package io.streamnative.pulsar.handlers.kop.utils.delayed;

import com.google.common.base.Preconditions;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import io.streamnative.pulsar.handlers.kop.utils.timer.Timer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/utils/delayed/DelayedOperationPurgatory.class */
public class DelayedOperationPurgatory<T extends DelayedOperation> {
    private static final Logger log = LoggerFactory.getLogger(DelayedOperationPurgatory.class);
    private final String purgatoryName;
    private final boolean ownTimer;
    private final Timer timeoutTimer;
    private final int purgeInterval;
    private final boolean reaperEnabled;
    private final boolean timerEnabled;
    private final ShutdownableThread expirationReaper;
    private final ReentrantReadWriteLock removeWatchersLock = new ReentrantReadWriteLock();
    private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0);
    private final ConcurrentMap<Object, DelayedOperationPurgatory<T>.Watchers> watchersForKey = new ConcurrentHashMap();

    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/utils/delayed/DelayedOperationPurgatory$Builder.class */
    public static class Builder<T extends DelayedOperation> {
        private String purgatoryName;
        private Timer timer;
        private int purgeInterval;
        private boolean reaperEnabled;
        private boolean timerEnabled;

        private Builder() {
            this.purgeInterval = 1000;
            this.reaperEnabled = true;
            this.timerEnabled = true;
        }

        public Builder<T> purgatoryName(String str) {
            this.purgatoryName = str;
            return this;
        }

        public Builder<T> timeoutTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder<T> purgeInterval(int i) {
            this.purgeInterval = i;
            return this;
        }

        public Builder<T> reaperEnabled(boolean z) {
            this.reaperEnabled = z;
            return this;
        }

        public Builder<T> timerEnabled(boolean z) {
            this.timerEnabled = z;
            return this;
        }

        public DelayedOperationPurgatory<T> build() {
            boolean z;
            if (null == this.timer) {
                z = true;
                this.timer = SystemTimer.builder().executorName(this.purgatoryName).build();
            } else {
                z = false;
            }
            return new DelayedOperationPurgatory<>(this.purgatoryName, this.timer, z, this.purgeInterval, this.reaperEnabled, this.timerEnabled);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/utils/delayed/DelayedOperationPurgatory$Watchers.class */
    public class Watchers {
        private final Object key;
        private final ConcurrentLinkedQueue<T> operations = new ConcurrentLinkedQueue<>();

        Watchers(Object obj) {
            this.key = obj;
        }

        public int countWatched() {
            return this.operations.size();
        }

        public boolean isEmpty() {
            return this.operations.isEmpty();
        }

        public void watch(T t) {
            this.operations.add(t);
        }

        public int tryCompleteWatched() {
            int i = 0;
            Iterator<T> it = this.operations.iterator();
            while (it.hasNext()) {
                T next = it.next();
                if (next.isCompleted()) {
                    it.remove();
                } else if (next.maybeTryComplete()) {
                    it.remove();
                    i++;
                }
            }
            if (this.operations.isEmpty()) {
                DelayedOperationPurgatory.this.removeKeyIfEmpty(this.key, this);
            }
            return i;
        }

        public List<T> cancel() {
            Iterator<T> it = this.operations.iterator();
            ArrayList arrayList = new ArrayList();
            while (it.hasNext()) {
                T next = it.next();
                next.cancel();
                it.remove();
                arrayList.add(next);
            }
            return arrayList;
        }

        int purgeCompleted() {
            int i = 0;
            Iterator<T> it = this.operations.iterator();
            while (it.hasNext()) {
                if (it.next().isCompleted()) {
                    it.remove();
                    i++;
                }
            }
            if (this.operations.isEmpty()) {
                DelayedOperationPurgatory.this.removeKeyIfEmpty(this.key, this);
            }
            return i;
        }
    }

    public static <T extends DelayedOperation> Builder<T> builder() {
        return new Builder<>();
    }

    public DelayedOperationPurgatory(String str, Timer timer, boolean z, int i, boolean z2, boolean z3) {
        this.purgatoryName = str;
        this.timeoutTimer = timer;
        this.ownTimer = z;
        this.purgeInterval = i;
        this.reaperEnabled = z2;
        this.timerEnabled = z3;
        this.expirationReaper = new ShutdownableThread(String.format("ExpirationReaper-%s", str)) { // from class: io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory.1
            @Override // io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread
            protected void doWork() {
                DelayedOperationPurgatory.this.advanceClock(200L);
            }
        };
        if (z2) {
            this.expirationReaper.start();
        }
    }

    public boolean tryCompleteElseWatch(T t, List<Object> list) {
        Preconditions.checkArgument(!list.isEmpty(), "The watch key list can't be empty");
        if (t.tryComplete()) {
            return true;
        }
        boolean z = false;
        for (Object obj : list) {
            if (t.isCompleted()) {
                return false;
            }
            watchForOperation(obj, t);
            if (!z) {
                z = true;
                this.estimatedTotalOperations.incrementAndGet();
            }
        }
        if (t.maybeTryComplete()) {
            return true;
        }
        if (t.isCompleted()) {
            return false;
        }
        if (this.timerEnabled) {
            this.timeoutTimer.add(t);
        }
        if (!t.isCompleted()) {
            return false;
        }
        t.cancel();
        return false;
    }

    public int checkAndComplete(Object obj) {
        Watchers watchers = (Watchers) CoreUtils.inReadLock(this.removeWatchersLock, () -> {
            return this.watchersForKey.get(obj);
        });
        if (null == watchers) {
            return 0;
        }
        return watchers.tryCompleteWatched();
    }

    public int watched() {
        return allWatchers().stream().mapToInt((v0) -> {
            return v0.countWatched();
        }).sum();
    }

    public int delayed() {
        return this.timeoutTimer.size();
    }

    public List<T> cancelForKey(Object obj) {
        return (List) CoreUtils.inWriteLock(this.removeWatchersLock, () -> {
            DelayedOperationPurgatory<T>.Watchers remove = this.watchersForKey.remove(obj);
            return remove != null ? remove.cancel() : Collections.emptyList();
        });
    }

    private Collection<DelayedOperationPurgatory<T>.Watchers> allWatchers() {
        return (Collection) CoreUtils.inReadLock(this.removeWatchersLock, () -> {
            return this.watchersForKey.values();
        });
    }

    private void watchForOperation(Object obj, T t) {
        CoreUtils.inReadLock(this.removeWatchersLock, () -> {
            this.watchersForKey.computeIfAbsent(obj, obj2 -> {
                return new Watchers(obj2);
            }).watch(t);
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeKeyIfEmpty(Object obj, DelayedOperationPurgatory<T>.Watchers watchers) {
        CoreUtils.inWriteLock(this.removeWatchersLock, () -> {
            if (this.watchersForKey.get(obj) != watchers || watchers == null || !watchers.isEmpty()) {
                return null;
            }
            this.watchersForKey.remove(obj);
            return null;
        });
    }

    public void shutdown() {
        if (this.reaperEnabled) {
            try {
                this.expirationReaper.shutdown();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("Interrupted at shutting down expiration reaper for {}", this.purgatoryName);
            }
        }
        if (this.ownTimer) {
            this.timeoutTimer.shutdown();
        }
    }

    public void advanceClock(long j) {
        this.timeoutTimer.advanceClock(j);
        if (this.estimatedTotalOperations.get() - delayed() > this.purgeInterval) {
            this.estimatedTotalOperations.getAndSet(delayed());
            if (log.isDebugEnabled()) {
                log.debug("{} Begin purging watch lists", this.purgatoryName);
            }
            int sum = allWatchers().stream().mapToInt((v0) -> {
                return v0.purgeCompleted();
            }).sum();
            if (log.isDebugEnabled()) {
                log.debug("{} Purged {} elements from watch lists.", this.purgatoryName, Integer.valueOf(sum));
            }
        }
    }
}
