package ai.eloquent.raft;

import ai.eloquent.raft.KeyValueStateMachine;
import ai.eloquent.util.Pointer;
import ai.eloquent.util.SafeTimerTask;
import ai.eloquent.util.TimerUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/eloquent/raft/RaftBackedCache.class */
public abstract class RaftBackedCache<V> implements Iterable<Map.Entry<String, V>>, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RaftBackedCache.class);
    private static final int DEFAULT_MAX_SIZE_BYTES = 1048576;
    public final Theseus raft;
    private SafeTimerTask evictionTask;
    Map<ChangeListener, KeyValueStateMachine.ChangeListener> changeListeners;
    private final Map<ChangeListener, WeakReference<Object>> changeListenerOwner;
    final Set<CompletableFuture<Boolean>> evictionTasksRunning;

    @FunctionalInterface
    /* loaded from: input_file:ai/eloquent/raft/RaftBackedCache$ChangeListener.class */
    public interface ChangeListener<V> {
        void onChange(String str, Optional<V> optional);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:ai/eloquent/raft/RaftBackedCache$Entry.class */
    public static class Entry<V> implements Map.Entry<String, V> {
        public boolean isPersisted;
        public final String key;
        public final V value;

        Entry(String str, boolean z, V v) {
            this.key = str;
            this.isPersisted = z;
            this.value = v;
        }

        public static <V> Entry<V> deserialize(String str, byte[] bArr, Function<byte[], Optional<V>> function) throws InvalidProtocolBufferException {
            byte b = bArr[0];
            byte[] bArr2 = new byte[bArr.length - 1];
            System.arraycopy(bArr, 1, bArr2, 0, bArr2.length);
            Optional<V> apply = function.apply(bArr2);
            if (apply.isPresent()) {
                return new Entry<>(str, b == 1, apply.get());
            }
            Optional<V> apply2 = function.apply(bArr);
            if (apply2.isPresent()) {
                return new Entry<>(str, false, apply2.get());
            }
            throw new InvalidProtocolBufferException("Deserialization returned Optional.empty()");
        }

        public static boolean readIsPersisted(byte[] bArr) {
            return bArr[0] == 1;
        }

        public byte[] serialize(Function<V, byte[]> function) {
            byte[] apply = function.apply(this.value);
            byte[] bArr = new byte[apply.length + 1];
            System.arraycopy(apply, 0, bArr, 1, apply.length);
            bArr[0] = (byte) (this.isPersisted ? 1 : 0);
            return bArr;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Map.Entry
        public String getKey() {
            return this.key;
        }

        @Override // java.util.Map.Entry
        public V getValue() {
            return this.value;
        }

        @Override // java.util.Map.Entry
        public V setValue(V v) {
            throw new UnsupportedOperationException();
        }
    }

    protected void onSet(V v) {
    }

    protected RaftBackedCache(final Theseus theseus, final Duration duration, final Duration duration2, final int i) {
        this.changeListeners = new IdentityHashMap();
        this.changeListenerOwner = new ConcurrentHashMap();
        this.evictionTasksRunning = ConcurrentHashMap.newKeySet();
        this.raft = theseus;
        this.evictionTask = new SafeTimerTask() { // from class: ai.eloquent.raft.RaftBackedCache.1
            @Override // ai.eloquent.util.SafeTimerTask
            public void runUnsafe() {
                RaftBackedCache.this.evictionTasksRunning.removeAll((Collection) RaftBackedCache.this.evictionTasksRunning.stream().filter(completableFuture -> {
                    return !completableFuture.isDone();
                }).collect(Collectors.toList()));
                Set<String> keysIdleSince = theseus.stateMachine.keysIdleSince(duration, theseus.node.transport.now());
                keysIdleSince.addAll(theseus.stateMachine.keysPresentSince(duration2, theseus.node.transport.now()));
                for (String str : keysIdleSince) {
                    if (str.startsWith(RaftBackedCache.this.prefix()) && !RaftBackedCache.this.valuePersistedSinceLastWrite(str)) {
                        Set<CompletableFuture<Boolean>> set = RaftBackedCache.this.evictionTasksRunning;
                        Theseus theseus2 = theseus;
                        Theseus theseus3 = theseus;
                        set.add(theseus2.withDistributedLockAsync(str, () -> {
                            Optional<byte[]> element = theseus3.getElement(str);
                            if (element.isPresent()) {
                                if (Entry.readIsPersisted(element.get())) {
                                    return CompletableFuture.completedFuture(true);
                                }
                                try {
                                    String replace = str.replace(RaftBackedCache.this.prefix(), "");
                                    byte[] bArr = element.get();
                                    RaftBackedCache raftBackedCache = RaftBackedCache.this;
                                    Entry deserialize = Entry.deserialize(replace, bArr, raftBackedCache::deserialize);
                                    try {
                                        RaftBackedCache.log.info("Persisting RaftBackedCache element with key {}", replace);
                                        RaftBackedCache.this.persist(replace, deserialize.value, false);
                                        deserialize.isPersisted = true;
                                        RaftBackedCache raftBackedCache2 = RaftBackedCache.this;
                                        return theseus3.setElementAsync(str, deserialize.serialize(raftBackedCache2::serialize), true, Duration.ofSeconds(30L));
                                    } catch (Throwable th) {
                                        RaftBackedCache.log.warn("Could not evict element from RaftBackedCache; not removing from Raft", th);
                                    }
                                } catch (InvalidProtocolBufferException e) {
                                    RaftBackedCache.log.warn("Could not deserialize Raft value for key {}", str);
                                }
                            }
                            return CompletableFuture.completedFuture(false);
                        }));
                    }
                }
                Collection<Map.Entry<String, KeyValueStateMachine.ValueWithOptionalOwner>> entries = theseus.stateMachine.entries();
                long sum = entries.stream().mapToInt(entry -> {
                    return ((KeyValueStateMachine.ValueWithOptionalOwner) entry.getValue()).value.length;
                }).sum();
                if (sum > i) {
                    ArrayList<Map.Entry> arrayList = new ArrayList(entries);
                    HashMap hashMap = new HashMap();
                    arrayList.sort(Comparator.comparingLong(entry2 -> {
                        return ((Long) hashMap.computeIfAbsent(entry2.getKey(), str2 -> {
                            return Long.valueOf(((KeyValueStateMachine.ValueWithOptionalOwner) entry2.getValue()).lastAccessed);
                        })).longValue();
                    }));
                    long j = i - sum;
                    for (Map.Entry entry3 : arrayList) {
                        if (j <= 0) {
                            return;
                        }
                        j -= ((KeyValueStateMachine.ValueWithOptionalOwner) entry3.getValue()).value.length;
                        String str2 = (String) entry3.getKey();
                        if (str2.startsWith(RaftBackedCache.this.prefix())) {
                            String replace = str2.replace(RaftBackedCache.this.prefix(), "");
                            Set<CompletableFuture<Boolean>> set2 = RaftBackedCache.this.evictionTasksRunning;
                            Theseus theseus4 = theseus;
                            Theseus theseus5 = theseus;
                            set2.add(theseus4.withDistributedLockAsync(str2, () -> {
                                Optional<byte[]> element = theseus5.getElement(str2);
                                if (!element.isPresent()) {
                                    return CompletableFuture.completedFuture(false);
                                }
                                if (!Entry.readIsPersisted(element.get())) {
                                    try {
                                        byte[] bArr = element.get();
                                        RaftBackedCache raftBackedCache = RaftBackedCache.this;
                                        Entry deserialize = Entry.deserialize(replace, bArr, raftBackedCache::deserialize);
                                        RaftBackedCache.log.info("Persisting RaftBackedCache element with key {} in preparation for eviction", replace);
                                        RaftBackedCache.this.persist(replace, deserialize.value, false);
                                    } catch (InvalidProtocolBufferException e) {
                                        RaftBackedCache.log.warn("Could not deserialize Raft value for key {}", str2);
                                    }
                                }
                                RaftBackedCache.log.info("Evicting RaftBackedCache element with key {}", replace);
                                return theseus5.removeElementAsync(str2, Duration.ofSeconds(30L));
                            }));
                        }
                    }
                }
            }
        };
        this.raft.node.transport.scheduleAtFixedRate(this.evictionTask, 1000L);
    }

    protected RaftBackedCache(Theseus theseus, Duration duration, Duration duration2) {
        this(theseus, duration, duration2, DEFAULT_MAX_SIZE_BYTES);
    }

    public CompletableFuture<Void> allOutstandingEvictionsFuture() {
        CompletableFuture<Void> allOf;
        synchronized (this.evictionTasksRunning) {
            allOf = CompletableFuture.allOf((CompletableFuture[]) this.evictionTasksRunning.toArray(new CompletableFuture[0]));
        }
        return allOf;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.evictionTask.cancel();
    }

    protected abstract String prefix();

    public abstract byte[] serialize(V v);

    public abstract Optional<V> deserialize(byte[] bArr);

    public abstract Optional<V> restore(String str);

    public abstract void persist(String str, V v, boolean z);

    public CompletableFuture<Boolean> clearCache() {
        HashSet hashSet = new HashSet();
        for (String str : this.raft.stateMachine.values.keySet()) {
            if (str.startsWith(prefix())) {
                hashSet.add(str);
            }
        }
        return this.raft.removeElementsAsync(hashSet, Duration.ofSeconds(30L));
    }

    public void addChangeListener(ChangeListener<V> changeListener, @Nullable Object obj) {
        KeyValueStateMachine.ChangeListener changeListener2 = (str, optional, map) -> {
            if (str.startsWith(prefix())) {
                String replace = str.replace(prefix(), "");
                Optional<V> empty = Optional.empty();
                if (optional.isPresent()) {
                    try {
                        empty = Optional.of(Entry.deserialize(replace, (byte[]) optional.get(), this::deserialize).value);
                    } catch (Throwable th) {
                        log.warn("Could not parse entry in change listener", th);
                    }
                }
                changeListener.onChange(replace, empty);
            }
        };
        this.changeListeners.put(changeListener, changeListener2);
        this.raft.addChangeListener(changeListener2);
        if (obj != null) {
            this.changeListenerOwner.put(changeListener, new WeakReference<>(obj));
            HashSet<ChangeListener> hashSet = new HashSet();
            for (Map.Entry<ChangeListener, WeakReference<Object>> entry : this.changeListenerOwner.entrySet()) {
                if (entry.getValue().get() == null) {
                    hashSet.add(entry.getKey());
                }
            }
            for (ChangeListener changeListener3 : hashSet) {
                log.warn("Leaked Raft change listener {} on cache {}", changeListener3, toString());
                removeChangeListener(changeListener3);
            }
        }
    }

    public void addChangeListener(ChangeListener<V> changeListener) {
        addChangeListener(changeListener, null);
    }

    public void removeChangeListener(ChangeListener changeListener) {
        KeyValueStateMachine.ChangeListener changeListener2 = this.changeListeners.get(changeListener);
        if (changeListener2 != null) {
            this.raft.removeChangeListener(changeListener2);
        } else {
            log.warn("No corresponding listener to remove from the KeyValueStateMachine. This is troubling.");
        }
        this.changeListenerOwner.remove(changeListener);
    }

    public CompletableFuture<Boolean> withElementAsync(String str, BiFunction<V, Consumer<V>, V> biFunction, @Nullable Supplier<V> supplier, boolean z) {
        log.trace("WithElement {}", str);
        Pointer pointer = new Pointer(false);
        Function<byte[], byte[]> function = bArr -> {
            try {
                Entry deserialize = Entry.deserialize(str, bArr, this::deserialize);
                Object apply = biFunction.apply(deserialize.value, obj -> {
                    try {
                        this.raft.setElementAsync(prefix() + str, new Entry(str, false, deserialize.value).serialize(this::serialize), true, Duration.ofSeconds(5L)).get(6L, TimeUnit.SECONDS);
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                        log.warn("Could not save intermediate state during withElementAsyc call in RaftBackedCache: ", e);
                    }
                });
                if (apply == deserialize.value) {
                    return bArr;
                }
                onSet(apply);
                boolean z2 = false;
                if (((Boolean) pointer.dereference().orElse(false)).booleanValue()) {
                    z2 = true;
                    persist(str, apply, false);
                }
                return new Entry(str, z2, apply).serialize(this::serialize);
            } catch (InvalidProtocolBufferException e) {
                log.warn("Could not deserialize value for key=" + prefix() + str);
                return bArr;
            }
        };
        Supplier<byte[]> supplier2 = () -> {
            Optional<V> restore = restore(str);
            if (restore.isPresent()) {
                return new Entry(str, true, restore.get()).serialize(this::serialize);
            }
            if (supplier == null) {
                return null;
            }
            Object obj = supplier.get();
            pointer.set((Pointer) true);
            return new Entry(str, false, obj).serialize(this::serialize);
        };
        return z ? this.raft.withElementUnlockedAsync(prefix() + str, function, supplier2, true) : this.raft.withElementAsync(prefix() + str, function, supplier2, true);
    }

    public CompletableFuture<Boolean> withElementAsync(String str, Function<V, V> function, @Nullable Supplier<V> supplier, boolean z) {
        return withElementAsync(str, (obj, consumer) -> {
            return function.apply(obj);
        }, supplier, z);
    }

    public CompletableFuture<Boolean> withElementAsync(String str, Function<V, V> function, Supplier<V> supplier) {
        return withElementAsync(str, (Function) function, (Supplier) supplier, false);
    }

    public CompletableFuture<Boolean> withElementAsync(String str, Function<V, V> function) {
        return withElementAsync(str, (Function) function, (Supplier) null, false);
    }

    public CompletableFuture<Boolean> withElementAsync(String str, BiFunction<V, Consumer<V>, V> biFunction) {
        return withElementAsync(str, (BiFunction) biFunction, (Supplier) null, false);
    }

    public Optional<V> get(String str) {
        Optional<byte[]> optional = this.raft.stateMachine.get(prefix() + str, TimerUtils.mockableNow().toEpochMilli());
        if (optional.isPresent()) {
            try {
                return Optional.of(Entry.deserialize(str, optional.get(), this::deserialize).value);
            } catch (InvalidProtocolBufferException e) {
                log.warn("Could not read Proto for Raft item");
                return Optional.empty();
            }
        }
        Optional<V> restore = restore(str);
        restore.ifPresent(obj -> {
            this.raft.setElementAsync(prefix() + str, new Entry(str, false, obj).serialize(this::serialize), true, Duration.ofSeconds(5L));
        });
        return restore;
    }

    public Optional<V> getIfPresent(String str) {
        return (Optional<V>) this.raft.stateMachine.get(prefix() + str, TimerUtils.mockableNow().toEpochMilli()).flatMap(bArr -> {
            try {
                return Optional.of(Entry.deserialize(str, bArr, this::deserialize).value);
            } catch (InvalidProtocolBufferException e) {
                return Optional.empty();
            }
        });
    }

    public CompletableFuture<Boolean> put(String str, V v, boolean z) {
        onSet(v);
        Entry entry = new Entry(str, false, v);
        if (z) {
            persist(str, v, false);
        }
        return this.raft.setElementAsync(prefix() + str, entry.serialize(this::serialize), true, Duration.ofSeconds(5L));
    }

    public CompletableFuture<Boolean> evictWithoutSaving(String str) {
        return this.raft.removeElementAsync(prefix() + str, Duration.ofSeconds(5L));
    }

    @Override // java.lang.Iterable
    @Nonnull
    public Iterator<Map.Entry<String, V>> iterator() {
        String prefix = prefix();
        return this.raft.getMap().entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(prefix);
        }).map(entry2 -> {
            try {
                return Entry.deserialize((String) entry2.getKey(), (byte[]) entry2.getValue(), this::deserialize);
            } catch (InvalidProtocolBufferException e) {
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).iterator();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean valuePersistedSinceLastWrite(String str) {
        return this.raft.getElement(str).filter(Entry::readIsPersisted).isPresent();
    }
}
