package ai.eloquent.raft;

import ai.eloquent.monitoring.Prometheus;
import ai.eloquent.raft.EloquentRaftProto;
import ai.eloquent.raft.KeyValueStateMachineProto;
import ai.eloquent.raft.RaftLog;
import ai.eloquent.raft.RaftState;
import ai.eloquent.util.RuntimeInterruptedException;
import ai.eloquent.util.TimerUtils;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.sun.management.GcInfo;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/eloquent/raft/EloquentRaftAlgorithm.class */
public class EloquentRaftAlgorithm implements RaftAlgorithm {
    private static final Logger log;
    public static final long MACHINE_DOWN_TIMEOUT = 30000;
    private static int LEAKY_BUCKET_SIZE;
    private final RaftState state;
    public final RaftTransport transport;
    private CompletableFuture<EloquentRaftProto.RaftMessage> clusterMembershipFuture;
    private volatile long lastClusterMembershipChange;
    private final long[] lastBroadcastTimes;
    private volatile AtomicInteger lastBroadcastNextIndex;
    private final Optional<RaftLifecycle> lifecycle;
    private long drivingThreadId;
    private Consumer<Runnable> drivingThreadQueue;
    static final /* synthetic */ boolean $assertionsDisabled;

    public EloquentRaftAlgorithm(String str, RaftStateMachine raftStateMachine, RaftTransport raftTransport, int i, ExecutorService executorService, Optional<RaftLifecycle> optional) {
        this(new RaftState(str, raftStateMachine, i, executorService), raftTransport, optional);
    }

    public EloquentRaftAlgorithm(String str, RaftStateMachine raftStateMachine, RaftTransport raftTransport, Collection<String> collection, ExecutorService executorService, Optional<RaftLifecycle> optional) {
        this(new RaftState(str, raftStateMachine, collection, executorService), raftTransport, optional);
    }

    public EloquentRaftAlgorithm(RaftState raftState, RaftTransport raftTransport, Optional<RaftLifecycle> optional) {
        this.lastBroadcastTimes = new long[LEAKY_BUCKET_SIZE + 1];
        this.lastBroadcastNextIndex = new AtomicInteger(0);
        this.drivingThreadId = -1L;
        this.drivingThreadQueue = (v0) -> {
            v0.run();
        };
        this.state = raftState;
        this.transport = raftTransport;
        this.lifecycle = optional;
        this.clusterMembershipFuture = CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(raftState.serverName).build());
        this.lastClusterMembershipChange = raftTransport.now();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setDrivingThread(Consumer<Runnable> consumer) {
        this.drivingThreadId = Thread.currentThread().getId();
        this.drivingThreadQueue = consumer;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public RaftState state() {
        if ($assertionsDisabled || this.drivingThreadId < 0 || this.drivingThreadId == Thread.currentThread().getId()) {
            return this.state.copy();
        }
        throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public RaftState mutableState() {
        return this.state;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public RaftStateMachine mutableStateMachine() {
        return this.state.log.stateMachine;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public long term() {
        return this.state.currentTerm;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public String serverName() {
        return this.state.serverName;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void receiveAppendEntriesRPC(EloquentRaftProto.AppendEntriesRequest appendEntriesRequest, Consumer<EloquentRaftProto.RaftMessage> consumer, long j) {
        EloquentRaftProto.RaftMessage mkRaftMessage;
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (appendEntriesRequest.getEntriesCount() > 0) {
            log.trace("{} - [{}] {}; num_entries={}  prevIndex={}  leader={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "AppendEntriesRPC", Integer.valueOf(appendEntriesRequest.getEntriesCount()), Long.valueOf(appendEntriesRequest.getPrevLogIndex()), appendEntriesRequest.getLeaderName()});
        }
        EloquentRaftProto.AppendEntriesReply.Builder followerName = EloquentRaftProto.AppendEntriesReply.newBuilder().setFollowerName(this.state.serverName);
        if (canPerformFollowerAction("AppendEntriesRPC", appendEntriesRequest.getTerm(), false)) {
            Optional<String> optional = this.state.leader;
            this.state.resetElectionTimeout(j, appendEntriesRequest.getLeaderName());
            if (!optional.equals(this.state.leader)) {
                log.info("{} - {}; registered new leader={}  old leader={}  time={}", new Object[]{this.state.serverName, "AppendEntriesRPC", this.state.leader.orElse("<none>"), optional.orElse("<none>"), Long.valueOf(j)});
            }
            if (this.state.log.appendEntries(appendEntriesRequest.getPrevLogIndex(), appendEntriesRequest.getPrevLogTerm(), appendEntriesRequest.getEntriesList())) {
                if (!$assertionsDisabled && this.state.log.getLastEntryIndex() < appendEntriesRequest.getPrevLogIndex() + appendEntriesRequest.getEntriesCount()) {
                    throw new AssertionError("appendEntries succeeded on " + this.state.serverName + ", but latest log index is not caught up(!?)  prevLogIndex=" + appendEntriesRequest.getPrevLogIndex() + "  entryCount=" + appendEntriesRequest.getEntriesCount() + "  new_lastIndex=" + this.state.log.getLastEntryIndex());
                }
                if (!appendEntriesRequest.getEntriesList().isEmpty()) {
                    try {
                        KeyValueStateMachineProto.Transition parseFrom = KeyValueStateMachineProto.Transition.parseFrom(appendEntriesRequest.getEntriesList().get(0).getTransition());
                        log.trace("{} - {} Appended entry @ time={} to index {} of type {}: {}", new Object[]{this.state.serverName, "AppendEntriesRPC", Long.valueOf(j), Long.valueOf(appendEntriesRequest.getEntriesList().get(0).getIndex()), parseFrom.getType(), parseFrom});
                    } catch (InvalidProtocolBufferException e) {
                    }
                }
                if (this.state.commitIndex() < appendEntriesRequest.getLeaderCommit()) {
                    if (!$assertionsDisabled && this.state.commitIndex() > appendEntriesRequest.getLeaderCommit()) {
                        throw new AssertionError("Leader has committed past our current index on " + this.state.serverName + "!  commitIndex=" + this.state.commitIndex() + "  heartbeat.commitIndex=" + appendEntriesRequest.getLeaderCommit() + "  prevLogIndex=" + appendEntriesRequest.getPrevLogIndex() + "  entryCount=" + appendEntriesRequest.getEntriesCount() + "  new_lastIndex=" + this.state.log.getLastEntryIndex());
                    }
                    this.state.commitUpTo(appendEntriesRequest.getLeaderCommit(), j);
                }
                if (this.state.leadership == RaftState.LeadershipStatus.LEADER) {
                    log.warn("{} - {} Raft got an inbound heartbeat as a leader -- this is a possible split-brain. Stepping down from leadership so we can sort it out democratically.", this.state.serverName, "AppendEntriesRPC");
                    this.state.stepDownFromElection();
                }
                if (appendEntriesRequest.getEntriesCount() > 0) {
                    log.trace("{} - {} replying success;  term={}  nextIndex={}  commitIndex={}", new Object[]{this.state.serverName, "AppendEntriesRPC", Long.valueOf(this.state.currentTerm), Long.valueOf(this.state.log.getLastEntryIndex() + 1), Long.valueOf(this.state.commitIndex())});
                }
                mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, followerName.setSuccess(true).setTerm(this.state.currentTerm).setNextIndex(this.state.log.getLastEntryIndex() + 1).setMissingFromQuorum(!this.state.log.latestQuorumMembers.contains(this.state.serverName)).m334build());
            } else if (appendEntriesRequest.getEntriesCount() > 0) {
                if (appendEntriesRequest.getPrevLogIndex() >= ((Long) this.state.log.snapshot.map(snapshot -> {
                    return Long.valueOf(snapshot.lastIndex);
                }).orElse(0L)).longValue()) {
                    log.warn("{} - {} replying error;  term={}  lastIndex={}  heartbeat.term={}  heartbeat.prevIndex={}", new Object[]{this.state.serverName, "AppendEntriesRPC", Long.valueOf(this.state.currentTerm), Long.valueOf(this.state.log.getLastEntryIndex()), Long.valueOf(appendEntriesRequest.getTerm()), Long.valueOf(appendEntriesRequest.getPrevLogIndex())});
                }
                mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, followerName.setSuccess(false).setTerm(this.state.currentTerm).setNextIndex(Math.max(((Long) this.state.log.snapshot.map(snapshot2 -> {
                    return Long.valueOf(snapshot2.lastIndex);
                }).orElse(0L)).longValue(), Math.min(appendEntriesRequest.getPrevLogIndex() - 1, this.state.log.getLastEntryIndex()))).setMissingFromQuorum(!this.state.log.latestQuorumMembers.contains(this.state.serverName)).m334build());
            } else {
                if (appendEntriesRequest.getEntriesCount() > 0) {
                    log.trace("{} - {} heartbeat has no payload;  term={}  nextIndex={}", new Object[]{this.state.serverName, "AppendEntriesRPC", Long.valueOf(this.state.currentTerm), Long.valueOf(appendEntriesRequest.getPrevLogIndex())});
                }
                mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, followerName.setSuccess(false).setTerm(this.state.currentTerm).setNextIndex(this.state.log.getLastEntryIndex() + 1).setMissingFromQuorum(!this.state.log.latestQuorumMembers.contains(this.state.serverName)).m334build());
            }
        } else {
            mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, followerName.setTerm(this.state.currentTerm).setNextIndex(this.state.log.getLastEntryIndex() + 1).setSuccess(false).setMissingFromQuorum(!this.state.log.latestQuorumMembers.contains(this.state.serverName)).m334build());
        }
        consumer.accept(mkRaftMessage);
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void receiveAppendEntriesReply(EloquentRaftProto.AppendEntriesReply appendEntriesReply, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        String str = "AppendEntriesReply";
        long currentTimeMillis = System.currentTimeMillis();
        try {
            log.trace("{} - [{}] {} from {}. success={}  term={}  nextIndex={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "AppendEntriesReply", appendEntriesReply.getFollowerName(), Boolean.valueOf(appendEntriesReply.getSuccess()), Long.valueOf(appendEntriesReply.getTerm()), Long.valueOf(appendEntriesReply.getNextIndex())});
            asLeader(appendEntriesReply.getFollowerName(), appendEntriesReply.getTerm(), j, () -> {
                if (!$assertionsDisabled && !this.state.isLeader()) {
                    throw new AssertionError("We should still be a leader if we process this reply");
                }
                if (!$assertionsDisabled && appendEntriesReply.getTerm() != this.state.currentTerm) {
                    throw new AssertionError("We should be on the correct term when getting a heartbeat reply");
                }
                if (!$assertionsDisabled && !this.state.nextIndex.isPresent()) {
                    throw new AssertionError("We should have a nextIndex map");
                }
                if (appendEntriesReply.getSuccess()) {
                    this.state.nextIndex.ifPresent(map -> {
                    });
                    this.state.matchIndex.ifPresent(map2 -> {
                    });
                    if (appendEntriesReply.getMissingFromQuorum() && this.state.log.committedQuorumMembers.contains(appendEntriesReply.getFollowerName()) && this.state.log.latestQuorumMembers.contains(appendEntriesReply.getFollowerName()) && this.clusterMembershipFuture.isDone() && this.lastClusterMembershipChange + electionTimeoutMillisRange().end < this.transport.now()) {
                        log.warn("{} - {} detected quorum mismatch on node {}. Trying to recover, but this is an error state.", new Object[]{this.state.serverName, str, appendEntriesReply.getFollowerName()});
                        receiveAddServerRPC(EloquentRaftProto.AddServerRequest.newBuilder().setNewServer(appendEntriesReply.getFollowerName()).addAllQuorum(this.state.log.latestQuorumMembers).m287build(), j);
                        return;
                    }
                    return;
                }
                log.trace("{} - {} from {} was rejected. resending with term={} and nextIndex={}", new Object[]{this.state.serverName, str, appendEntriesReply.getFollowerName(), Long.valueOf(appendEntriesReply.getTerm()), Long.valueOf(appendEntriesReply.getNextIndex())});
                long max = Math.max(0L, appendEntriesReply.getNextIndex() - 1);
                if (max > 0 && !this.state.log.getEntryAtIndex(max).isPresent() && ((!this.state.log.snapshot.isPresent() || max != this.state.log.snapshot.get().lastIndex) && ((Long) this.state.matchIndex.map(map3 -> {
                    return (Long) map3.get(appendEntriesReply.getFollowerName());
                }).orElse(0L)).longValue() < max)) {
                    this.state.matchIndex.ifPresent(map4 -> {
                    });
                    this.state.nextIndex.ifPresent(map5 -> {
                    });
                    log.warn("{} - {}  Follower asked for an index we do not have; setting index to 0 (to trigger snapshot)", this.state.serverName, str);
                    max = 0;
                }
                Optional<List<EloquentRaftProto.LogEntry>> entriesSinceInclusive = this.state.log.getEntriesSinceInclusive(max + 1);
                if (!entriesSinceInclusive.isPresent() || entriesSinceInclusive.get().size() > 0) {
                    sendAppendEntries(appendEntriesReply.getFollowerName(), max + 1);
                }
            });
            if (!$assertionsDisabled && !checkDuration("AppendEntriesReply", currentTimeMillis, System.currentTimeMillis())) {
                throw new AssertionError();
            }
        } catch (Throwable th) {
            if (!$assertionsDisabled && !checkDuration("AppendEntriesReply", currentTimeMillis, System.currentTimeMillis())) {
                throw new AssertionError();
            }
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <E> E generalizedAppendEntries(long j, BiFunction<EloquentRaftProto.AppendEntriesRequest, EloquentRaftProto.InstallSnapshotRequest, E> biFunction) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        Optional empty = Optional.empty();
        Optional empty2 = Optional.empty();
        Optional<List<EloquentRaftProto.LogEntry>> entriesSinceInclusive = this.state.log.getEntriesSinceInclusive(j);
        Optional<Long> previousEntryTerm = this.state.log.getPreviousEntryTerm(j - 1);
        if (entriesSinceInclusive.isPresent() && previousEntryTerm.isPresent()) {
            empty = Optional.of(EloquentRaftProto.AppendEntriesRequest.newBuilder().setTerm(this.state.currentTerm).setLeaderName(this.state.serverName).setPrevLogIndex(j - 1).setPrevLogTerm(previousEntryTerm.get().longValue()).addAllEntries(entriesSinceInclusive.get()).setLeaderCommit(this.state.log.getCommitIndex()).build());
            log.trace("{} - sending appendEntriesRequest; logIndex={}  logTerm={}  # entries={}", new Object[]{this.state.serverName, Long.valueOf(j - 1), previousEntryTerm.get(), Integer.valueOf(entriesSinceInclusive.get().size())});
        } else {
            empty2 = this.state.log.snapshot.map(snapshot -> {
                return EloquentRaftProto.InstallSnapshotRequest.newBuilder().setTerm(this.state.currentTerm).setLeaderName(this.state.serverName).setLastIndex(snapshot.lastIndex).setLastTerm(snapshot.lastTerm).addAllLastConfig(snapshot.lastClusterMembership).setData(ByteString.copyFrom(snapshot.serializedStateMachine)).build();
            });
        }
        return (E) biFunction.apply(empty.orElse(null), empty2.orElse(null));
    }

    private CompletableFuture<EloquentRaftProto.RaftMessage> rpcAppendEntries(String str, long j, long j2) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        String str2 = "AppendEntriesRPC";
        return ((CompletableFuture) generalizedAppendEntries(j, (appendEntriesRequest, installSnapshotRequest) -> {
            if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
            }
            if (appendEntriesRequest != null) {
                return this.transport.rpcTransportAsFuture(this.state.serverName, str, appendEntriesRequest, (raftMessage, th) -> {
                    if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                        throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                    }
                    if (raftMessage != null) {
                        return raftMessage;
                    }
                    Logger logger = log;
                    Object[] objArr = new Object[3];
                    objArr[0] = this.state.serverName;
                    objArr[1] = str2;
                    objArr[2] = th == null ? "unknown error" : th.getClass().getName() + ": " + th.getMessage();
                    logger.warn("{} - {} Failure: <{}>", objArr);
                    return RaftTransport.mkRaftMessage(this.state.serverName, EloquentRaftProto.RaftMessage.newBuilder().setAppendEntriesReply(EloquentRaftProto.AppendEntriesReply.newBuilder().setFollowerName(str).setTerm(this.state.currentTerm).setSuccess(false)).build());
                }, this.drivingThreadQueue, j2);
            }
            if (installSnapshotRequest != null) {
                return this.transport.rpcTransportAsFuture(this.state.serverName, str, installSnapshotRequest, (raftMessage2, th2) -> {
                    if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                        throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                    }
                    if (raftMessage2 != null) {
                        return raftMessage2;
                    }
                    Logger logger = log;
                    Object[] objArr = new Object[3];
                    objArr[0] = this.state.serverName;
                    objArr[1] = str2;
                    objArr[2] = th2 == null ? "unknown error" : th2.getClass().getName() + ": " + th2.getMessage();
                    logger.warn("{} - {} Failure: <{}>", objArr);
                    return RaftTransport.mkRaftMessage(this.state.serverName, EloquentRaftProto.RaftMessage.newBuilder().setInstallSnapshotReply(EloquentRaftProto.InstallSnapshotReply.newBuilder().setTerm(this.state.currentTerm).build()));
                }, this.drivingThreadQueue, j2);
            }
            log.warn("{} - We have neither log entries or a snapshot to send to {}.", this.state.serverName, str);
            return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.getDefaultInstance());
        })).thenApply(raftMessage -> {
            if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
            }
            if (raftMessage.getAppendEntriesReply() != EloquentRaftProto.AppendEntriesReply.getDefaultInstance()) {
                receiveAppendEntriesReply(raftMessage.getAppendEntriesReply(), this.transport.now());
            } else if (raftMessage.getInstallSnapshotReply() != EloquentRaftProto.InstallSnapshotReply.getDefaultInstance()) {
                receiveInstallSnapshotReply(raftMessage.getInstallSnapshotReply(), this.transport.now());
            }
            return raftMessage;
        });
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void sendAppendEntries(String str, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        log.trace("{} - Sending appendEntries to {} with nextIndex {}", new Object[]{this.state.serverName, str, Long.valueOf(j)});
        generalizedAppendEntries(j, (appendEntriesRequest, installSnapshotRequest) -> {
            if (appendEntriesRequest != null) {
                this.transport.sendTransport(this.state.serverName, str, appendEntriesRequest);
                return null;
            }
            if (installSnapshotRequest != null) {
                this.transport.sendTransport(this.state.serverName, str, installSnapshotRequest);
                return null;
            }
            log.warn("{} - We have neither log entries or a snapshot to send to {}.", this.state.serverName, str);
            return null;
        });
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void broadcastAppendEntries(long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        broadcastAppendEntries(j, false, false);
    }

    private void broadcastAppendEntries(long j, boolean z, boolean z2) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (!z2) {
            int i = 0;
            for (long j2 : this.lastBroadcastTimes) {
                if (j2 > 0 && j > j2 && j - j2 < heartbeatMillis()) {
                    i++;
                }
            }
            if (i > LEAKY_BUCKET_SIZE) {
                return;
            }
        }
        this.lastBroadcastTimes[this.lastBroadcastNextIndex.getAndIncrement() % this.lastBroadcastTimes.length] = j;
        if (!this.state.isLeader()) {
            log.trace("{} - {}; Not the leader", this.state.serverName, "broadcastAppendEntries");
            return;
        }
        if (!$assertionsDisabled && !this.state.nextIndex.isPresent()) {
            throw new AssertionError("Running heartbeat as leader, but don't have a nextIndex defined");
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j3 = Long.MAX_VALUE;
        long j4 = Long.MAX_VALUE;
        List<EloquentRaftProto.LogEntry> list = null;
        if (this.state.nextIndex.isPresent()) {
            Map<String, Long> map = this.state.nextIndex.get();
            for (String str : this.state.log.getQuorumMembers()) {
                if (!str.equals(this.state.serverName)) {
                    if (!map.containsKey(str)) {
                        map.put(str, Long.valueOf(this.state.log.getLastEntryIndex() + 1));
                    }
                    long longValue = map.get(str).longValue();
                    long max = Math.max(0L, longValue - 1);
                    long longValue2 = this.state.log.getPreviousEntryTerm(max).orElse(-1L).longValue();
                    Optional<List<EloquentRaftProto.LogEntry>> entriesSinceInclusive = this.state.log.getEntriesSinceInclusive(longValue);
                    if (entriesSinceInclusive.isPresent() && (longValue2 < j4 || (longValue2 == j4 && max < j3))) {
                        j4 = longValue2;
                        j3 = max;
                        list = entriesSinceInclusive.get();
                    }
                }
            }
        }
        checkDuration("finding update index", currentTimeMillis, System.currentTimeMillis());
        if (j4 == Long.MAX_VALUE || j3 == Long.MAX_VALUE || list == null) {
            j3 = this.state.log.getLastEntryIndex();
            j4 = this.state.log.getLastEntryTerm();
            list = Collections.emptyList();
            if (z && j3 > 0) {
                j3--;
                j4 = this.state.log.getPreviousEntryTerm(j3).orElse(-1L).longValue();
                list = this.state.log.getEntriesSinceInclusive(j3 + 1).orElse(Collections.emptyList());
            }
        }
        if (this.state.isLeader()) {
            updateLeaderInvariantsPostMessage(Optional.empty(), j);
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        EloquentRaftProto.AppendEntriesRequest build = EloquentRaftProto.AppendEntriesRequest.newBuilder().setTerm(this.state.currentTerm).setLeaderName(this.state.serverName).setPrevLogIndex(j3).setPrevLogTerm(j4).addAllEntries(list).setLeaderCommit(this.state.log.getCommitIndex()).build();
        checkDuration("creating heartbeat proto", currentTimeMillis2, System.currentTimeMillis());
        log.trace("{} - Broadcasting appendEntriesRequest; logIndex={}  logTerm={}  # entries={}", new Object[]{this.state.serverName, Long.valueOf(j3), Long.valueOf(j4), Integer.valueOf(list.size())});
        long currentTimeMillis3 = System.currentTimeMillis();
        try {
            this.transport.broadcastTransport(this.state.serverName, build);
            checkDuration("broadcast transport call", currentTimeMillis3, System.currentTimeMillis());
        } catch (Throwable th) {
            checkDuration("broadcast transport call", currentTimeMillis3, System.currentTimeMillis());
            throw th;
        }
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void receiveInstallSnapshotRPC(EloquentRaftProto.InstallSnapshotRequest installSnapshotRequest, Consumer<EloquentRaftProto.RaftMessage> consumer, long j) {
        EloquentRaftProto.RaftMessage mkRaftMessage;
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        log.trace("{} - [{}] {}:  term={}  lastIndex={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "InstallSnapshotRPC", Long.valueOf(installSnapshotRequest.getTerm()), Long.valueOf(installSnapshotRequest.getLastIndex())});
        EloquentRaftProto.InstallSnapshotReply.Builder followerName = EloquentRaftProto.InstallSnapshotReply.newBuilder().setFollowerName(this.state.serverName);
        Optional<String> optional = this.state.leader;
        this.state.resetElectionTimeout(j, installSnapshotRequest.getLeaderName());
        if (!optional.equals(this.state.leader)) {
            log.info("{} - {}; registered new leader={}  old leader={}  term={}  lastIndex={}  time={}", new Object[]{this.state.serverName, "InstallSnapshotRPC", this.state.leader.orElse("<none>"), optional.orElse("<none>"), Long.valueOf(installSnapshotRequest.getTerm()), Long.valueOf(installSnapshotRequest.getLastIndex()), Long.valueOf(j)});
        }
        if (!canPerformFollowerAction("InstallSnapshotRPC", installSnapshotRequest.getTerm(), false)) {
            mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, followerName.setTerm(this.state.currentTerm).setNextIndex(this.state.log.getLastEntryIndex() + 1).build());
        } else {
            if (!$assertionsDisabled && installSnapshotRequest.getTerm() != this.state.currentTerm) {
                throw new AssertionError("Should not have been allowed to continue if the terms don't match!");
            }
            this.state.log.installSnapshot(new RaftLog.Snapshot(installSnapshotRequest.getData().toByteArray(), installSnapshotRequest.getLastIndex(), installSnapshotRequest.getLastTerm(), installSnapshotRequest.getLastConfigList()), j);
            log.trace("{} - {} success: last index in log is {}", new Object[]{this.state.serverName, "InstallSnapshotRPC", Long.valueOf(this.state.log.getLastEntryIndex())});
            mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, followerName.setTerm(this.state.currentTerm).setNextIndex(this.state.log.getLastEntryIndex() + 1).build());
        }
        consumer.accept(mkRaftMessage);
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void receiveInstallSnapshotReply(EloquentRaftProto.InstallSnapshotReply installSnapshotReply, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        log.trace("{} - [{}] {} from {}. term={}  nextIndex={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "InstallSnapshotReply", installSnapshotReply.getFollowerName(), Long.valueOf(installSnapshotReply.getTerm()), Long.valueOf(installSnapshotReply.getNextIndex())});
        asLeader(installSnapshotReply.getFollowerName(), installSnapshotReply.getTerm(), j, () -> {
            this.state.nextIndex.ifPresent(map -> {
            });
            this.state.matchIndex.ifPresent(map2 -> {
            });
            Optional<List<EloquentRaftProto.LogEntry>> entriesSinceInclusive = this.state.log.getEntriesSinceInclusive(installSnapshotReply.getNextIndex());
            if (!entriesSinceInclusive.isPresent() || entriesSinceInclusive.get().size() > 0) {
                sendAppendEntries(installSnapshotReply.getFollowerName(), installSnapshotReply.getNextIndex());
            }
        });
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void receiveRequestVoteRPC(EloquentRaftProto.RequestVoteRequest requestVoteRequest, Consumer<EloquentRaftProto.RaftMessage> consumer, long j) {
        EloquentRaftProto.RaftMessage mkRaftMessage;
        boolean z;
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        log.trace("{} - [{}] {};  candidate={}  term={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "RequestVoteRPC", requestVoteRequest.getCandidateName(), Long.valueOf(requestVoteRequest.getTerm())});
        if (canPerformFollowerAction("RequestVoteRPC", requestVoteRequest.getTerm(), false)) {
            if (requestVoteRequest.getTerm() < this.state.currentTerm) {
                z = false;
            } else {
                z = (!this.state.votedFor.isPresent() || this.state.votedFor.get().equals(requestVoteRequest.getCandidateName())) && requestVoteRequest.getLastLogIndex() >= this.state.log.getLastEntryIndex();
                if (z) {
                    Optional<String> optional = this.state.leader;
                    this.state.voteFor(requestVoteRequest.getCandidateName());
                    if (!optional.equals(this.state.leader)) {
                        log.info("{} - {}; registered new leader={}  old leader={}  time={}", new Object[]{this.state.serverName, "RequestVoteRPC", this.state.leader.orElse("<none>"), optional.orElse("<none>"), Long.valueOf(j)});
                    }
                }
            }
            log.trace("{} - {} replying;  candidate={}  vote_granted={}  term={}  voted_for={}", new Object[]{this.state.serverName, "RequestVoteRPC", requestVoteRequest.getCandidateName(), Boolean.valueOf(z), Long.valueOf(this.state.currentTerm), this.state.votedFor.orElse("<nobody>")});
            mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, EloquentRaftProto.RequestVoteReply.newBuilder().setFollowerName(this.state.serverName).setTerm(requestVoteRequest.getTerm()).setFollowerTerm(this.state.currentTerm).setVoteGranted(z).build());
        } else {
            log.trace("{} - {} we're not allowed to perform this action; responding with a rejected vote", this.state.serverName, "RequestVoteRPC");
            mkRaftMessage = RaftTransport.mkRaftMessage(this.state.serverName, EloquentRaftProto.RequestVoteReply.newBuilder().setFollowerName(this.state.serverName).setTerm(requestVoteRequest.getTerm()).setFollowerTerm(this.state.currentTerm).setVoteGranted(false).build());
        }
        consumer.accept(mkRaftMessage);
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void receiveRequestVotesReply(EloquentRaftProto.RequestVoteReply requestVoteReply, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        log.trace("{} - [{}] {} from {};  term={}  follower_term={}  vote_granted={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "RequestVoteReply", requestVoteReply.getFollowerName(), Long.valueOf(requestVoteReply.getTerm()), Long.valueOf(requestVoteReply.getFollowerTerm()), Boolean.valueOf(requestVoteReply.getVoteGranted())});
        if (canPerformFollowerAction("RequestVoteReply", requestVoteReply.getTerm(), true) && requestVoteReply.getVoteGranted()) {
            this.state.receiveVoteFrom(requestVoteReply.getFollowerName());
            log.trace("{} - {} received vote; have {} votes total ({})", new Object[]{this.state.serverName, "RequestVoteReply", Integer.valueOf(this.state.votesReceived.size()), this.state.votesReceived});
            if (this.state.votesReceived.size() > this.state.log.getQuorumMembers().size() / 2) {
                if (this.state.leadership == RaftState.LeadershipStatus.LEADER) {
                    log.trace("{} - {} already elected to term {}", new Object[]{this.state.serverName, "RequestVoteReply", Long.valueOf(this.state.currentTerm)});
                    return;
                }
                this.state.elect(j);
                log.info("{} - Raft won an election for term {} (time={}  members={})", new Object[]{this.state.serverName, Long.valueOf(this.state.currentTerm), Long.valueOf(j), this.state.log.getQuorumMembers()});
                broadcastAppendEntries(j, false, true);
            }
        }
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void triggerElection(long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (!this.state.isCandidate()) {
            this.state.becomeCandidate();
        }
        if (!$assertionsDisabled && !this.state.isCandidate()) {
            throw new AssertionError("Should not be requesting votes if we're not a candidate");
        }
        log.info("{} - Raft triggered an election with term {} (time={} chkpt={} cluster={} leader={})", new Object[]{this.state.serverName, Long.valueOf(this.state.currentTerm), Long.valueOf(j), Long.valueOf(this.state.electionTimeoutCheckpoint), this.state.log.getQuorumMembers(), this.state.leader.orElse("<unknown>")});
        this.state.setCurrentTerm(this.state.currentTerm + 1);
        this.state.voteFor(this.state.serverName);
        Optional<String> optional = this.state.leader;
        this.state.resetElectionTimeout(j, this.state.serverName);
        if (!optional.equals(this.state.leader)) {
            log.info("{} - {}; registered new leader={}  old leader={}  time={}", new Object[]{this.state.serverName, "triggerElection", this.state.leader.orElse("<none>"), optional.orElse("<none>"), Long.valueOf(j)});
        }
        EloquentRaftProto.RequestVoteRequest build = EloquentRaftProto.RequestVoteRequest.newBuilder().setTerm(this.state.currentTerm).setCandidateName(this.state.serverName).setLastLogIndex(this.state.log.getLastEntryIndex()).setLastLogTerm(this.state.log.getLastEntryTerm()).build();
        if (this.state.votesReceived.size() > this.state.log.getQuorumMembers().size() / 2) {
            if (this.state.leadership != RaftState.LeadershipStatus.LEADER) {
                this.state.elect(j);
                log.info("{} - Raft won an election (by default) for term {} (time={}; cluster={})", new Object[]{this.state.serverName, Long.valueOf(this.state.currentTerm), Long.valueOf(j), this.state.log.latestQuorumMembers});
            } else {
                log.trace("{} - already elected to term {}", this.state.serverName, Long.valueOf(this.state.currentTerm));
            }
        }
        log.trace("{} - Broadcasting requestVote", this.state.serverName);
        this.transport.broadcastTransport(this.state.serverName, build);
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public CompletableFuture<EloquentRaftProto.RaftMessage> receiveAddServerRPC(EloquentRaftProto.AddServerRequest addServerRequest, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        String str = "AddServerRPC";
        log.info("{} - [{}] {};  is_leader={}  new_server={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "AddServerRPC", Boolean.valueOf(this.state.isLeader()), addServerRequest.getNewServer()});
        if (!this.state.isLeader()) {
            if (!((Boolean) this.state.leader.map(str2 -> {
                return Boolean.valueOf(!str2.equals(this.state.serverName));
            }).orElse(false)).booleanValue()) {
                log.info("{} - got {}; we're not the leader and don't know who the leader is", this.state.serverName, "AddServerRPC");
                return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
            }
            if (addServerRequest.mo253getForwardedByList().contains(this.state.serverName)) {
                log.info("{} - {}; we've been forwarded our own message back to us in a loop. Failing the message", this.state.serverName, "AddServerRPC");
                return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
            }
            log.trace("{} - got {}; forwarding request to {}", new Object[]{this.state.serverName, "AddServerRPC", this.state.leader.orElse("<unknown>")});
            return this.transport.rpcTransportAsFuture(this.state.serverName, this.state.leader.orElse("<unknown>"), addServerRequest.m249toBuilder().addForwardedBy(this.state.serverName).m287build(), (raftMessage, th) -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                if (raftMessage != null) {
                    return raftMessage;
                }
                Logger logger = log;
                Object[] objArr = new Object[3];
                objArr[0] = this.state.serverName;
                objArr[1] = str;
                objArr[2] = th == null ? "unknown error" : th.getClass().getName() + ": " + th.getMessage();
                logger.warn("{} - {} Failure: <{}>", objArr);
                return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build();
            }, this.drivingThreadQueue, electionTimeoutMillisRange().end);
        }
        this.clusterMembershipFuture = this.clusterMembershipFuture.thenApply(raftMessage2 -> {
            if ($assertionsDisabled || this.drivingThreadId < 0 || this.drivingThreadId == Thread.currentThread().getId()) {
                return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).build();
            }
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        });
        this.state.observeLifeFrom(addServerRequest.getNewServer(), j);
        if (this.state.matchIndex.orElse(new HashMap()).getOrDefault(addServerRequest.getNewServer(), 0L).longValue() < this.state.log.getLastEntryIndex()) {
            for (int i = 0; i < 3; i++) {
                long j2 = i + 1;
                long max = Math.max(5000L, Math.min(1000L, (electionTimeoutMillisRange().begin * (3 - i)) / 2));
                long longValue = this.state.matchIndex.orElse(new HashMap()).getOrDefault(addServerRequest.getNewServer(), 0L).longValue();
                if (longValue == this.state.log.getLastEntryIndex()) {
                    break;
                }
                this.clusterMembershipFuture = this.clusterMembershipFuture.thenCompose(raftMessage3 -> {
                    if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                        throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                    }
                    log.trace("{} - {}.2;  round={}  is_leader={}", new Object[]{this.state.serverName, str, Long.valueOf(j2), Boolean.valueOf(this.state.isLeader())});
                    if (raftMessage3.getAddServerReply() != EloquentRaftProto.AddServerReply.getDefaultInstance()) {
                        return CompletableFuture.completedFuture(raftMessage3);
                    }
                    if (!this.state.isLeader()) {
                        return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
                    }
                    try {
                        log.trace("{} - {}.2 sending_append_entries  round={}  term={}  nextIndex={}", new Object[]{this.state.serverName, str, Long.valueOf(j2), addServerRequest.getNewServer(), Long.valueOf(raftMessage3.getAppendEntriesReply().getTerm()), Long.valueOf(raftMessage3.getAppendEntriesReply().getNextIndex())});
                        return rpcAppendEntries(addServerRequest.getNewServer(), longValue + 1, max);
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.warn("{} - {}.2 Could not execute Raft update for server {}; is the server up? Exception: <{}>", new Object[]{this.state.serverName, str, addServerRequest.getNewServer(), e.getClass() + ": " + e.getMessage()});
                        return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OTHER_ERROR)).build());
                    }
                });
            }
        } else {
            log.trace("{} - {}.2;  no update needed, so skipping updating the new server", this.state.serverName, "AddServerRPC");
        }
        this.clusterMembershipFuture = this.clusterMembershipFuture.thenCompose(raftMessage4 -> {
            if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
            }
            log.trace("{} - {}.3;  is_leader={}", new Object[]{this.state.serverName, str, Boolean.valueOf(this.state.isLeader())});
            if (raftMessage4.getAddServerReply() != EloquentRaftProto.AddServerReply.getDefaultInstance()) {
                return CompletableFuture.completedFuture(raftMessage4);
            }
            if (!this.state.isLeader()) {
                return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
            }
            Optional<RaftLogEntryLocation> lastConfigurationEntryLocation = this.state.log.lastConfigurationEntryLocation();
            if (!lastConfigurationEntryLocation.isPresent()) {
                return CompletableFuture.completedFuture(raftMessage4);
            }
            log.trace("{} - {}.3 waiting for commit", this.state.serverName, str);
            broadcastAppendEntries(this.transport.now(), false, true);
            return this.state.log.createCommitFuture(lastConfigurationEntryLocation.get().index, lastConfigurationEntryLocation.get().term, true).thenApply(bool -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                log.trace("{} - {}.3 committed;  success={}", new Object[]{this.state.serverName, str, bool});
                if (!bool.booleanValue()) {
                    return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OTHER_ERROR)).build();
                }
                broadcastAppendEntries(this.transport.now(), false, true);
                return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OK)).build();
            });
        });
        this.clusterMembershipFuture = this.clusterMembershipFuture.thenCompose(raftMessage5 -> {
            if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
            }
            log.trace("{} - {}.4;  is_leader={}", new Object[]{this.state.serverName, str, Boolean.valueOf(this.state.isLeader())});
            if (raftMessage5.getAddServerReply().getStatus() != EloquentRaftProto.MembershipChangeStatus.OK) {
                return CompletableFuture.completedFuture(raftMessage5);
            }
            if (!this.state.isLeader()) {
                return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
            }
            if (this.transport.now() - j > MACHINE_DOWN_TIMEOUT) {
                return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.TIMEOUT)).build());
            }
            HashSet hashSet = new HashSet(this.state.log.getQuorumMembers());
            boolean z = !hashSet.add(addServerRequest.getNewServer());
            if (!z && !addServerRequest.mo254getQuorumList().isEmpty() && !hashSet.equals(new HashSet((Collection) addServerRequest.mo254getQuorumList()))) {
                log.warn("{} - {} consistency error (forcing consistency);  adding={}  newMembership={}  presumedMembership={}", new Object[]{this.state.serverName, str, addServerRequest.getNewServer(), hashSet, addServerRequest.mo254getQuorumList()});
                hashSet = new HashSet((Collection) addServerRequest.mo254getQuorumList());
                z = true;
            }
            RaftLogEntryLocation reconfigure = this.state.reconfigure(hashSet, z, j);
            log.info("{} - {} submitted new configuration to log: {}", new Object[]{this.state.serverName, str, hashSet});
            log.trace("{} - {}.4 waiting for commit", this.state.serverName, str);
            broadcastAppendEntries(this.transport.now(), false, true);
            return this.state.log.createCommitFuture(reconfigure.index, reconfigure.term, true).thenApply(bool -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                log.trace("{} - {}.4 committed;  success={}", new Object[]{this.state.serverName, str, bool});
                if (!bool.booleanValue()) {
                    return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OTHER_ERROR)).build();
                }
                log.info("{} - {} committed new configuration to log: {}", new Object[]{this.state.serverName, str, this.state.log.committedQuorumMembers});
                broadcastAppendEntries(this.transport.now(), false, true);
                return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OK)).build();
            });
        });
        CompletableFuture<EloquentRaftProto.RaftMessage> completableFuture = new CompletableFuture<>();
        this.clusterMembershipFuture.handle((raftMessage6, th2) -> {
            Logger logger = log;
            Object[] objArr = new Object[4];
            objArr[0] = this.state.serverName;
            objArr[1] = str;
            objArr[2] = raftMessage6 == null ? "<exception>" : raftMessage6.getAddServerReply().getStatus();
            objArr[3] = Boolean.valueOf(th2 != null);
            logger.trace("{} - {}.5 complete;  result={}  have_exception={}", objArr);
            if (th2 != null) {
                log.warn("Exception on addServer", th2);
            }
            this.lastClusterMembershipChange = this.transport.now();
            if (raftMessage6 != null) {
                completableFuture.complete(raftMessage6);
            } else {
                completableFuture.complete(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setAddServerReply(EloquentRaftProto.AddServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.TIMEOUT)).build());
            }
            return raftMessage6;
        });
        return completableFuture;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public CompletableFuture<EloquentRaftProto.RaftMessage> receiveRemoveServerRPC(EloquentRaftProto.RemoveServerRequest removeServerRequest, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        String str = "RemoveServerRPC";
        log.info("{} - [{}] {};  is_leader={}  to_remove={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "RemoveServerRPC", Boolean.valueOf(this.state.isLeader()), removeServerRequest.getOldServer()});
        if (this.state.isLeader()) {
            this.clusterMembershipFuture = this.clusterMembershipFuture.thenApply(raftMessage -> {
                if ($assertionsDisabled || this.drivingThreadId < 0 || this.drivingThreadId == Thread.currentThread().getId()) {
                    return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).build();
                }
                throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
            });
            this.clusterMembershipFuture = this.clusterMembershipFuture.thenCompose(raftMessage2 -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                log.trace("{} - {}.2;  is_leader={}", new Object[]{this.state.serverName, str, Boolean.valueOf(this.state.isLeader())});
                if (raftMessage2.getAddServerReply().getStatus() != EloquentRaftProto.MembershipChangeStatus.OK) {
                    return CompletableFuture.completedFuture(raftMessage2);
                }
                if (!this.state.isLeader()) {
                    return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
                }
                Optional<RaftLogEntryLocation> lastConfigurationEntryLocation = this.state.log.lastConfigurationEntryLocation();
                if (!lastConfigurationEntryLocation.isPresent()) {
                    return CompletableFuture.completedFuture(raftMessage2);
                }
                log.trace("{} - {}.2 waiting for commit", this.state.serverName, str);
                broadcastAppendEntries(this.transport.now(), false, true);
                return this.state.log.createCommitFuture(lastConfigurationEntryLocation.get().index, lastConfigurationEntryLocation.get().term, true).thenApply(bool -> {
                    if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                        throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                    }
                    log.trace("{} - {}.2 committed;  success={}", new Object[]{this.state.serverName, str, bool});
                    if (!bool.booleanValue()) {
                        return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OTHER_ERROR)).build();
                    }
                    broadcastAppendEntries(this.transport.now(), false, true);
                    return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OK)).build();
                });
            });
            this.clusterMembershipFuture = this.clusterMembershipFuture.thenCompose(raftMessage3 -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                log.trace("{} - {}.3;  is_leader={}", new Object[]{this.state.serverName, str, Boolean.valueOf(this.state.isLeader())});
                if (raftMessage3.getAddServerReply().getStatus() != EloquentRaftProto.MembershipChangeStatus.OK) {
                    return CompletableFuture.completedFuture(raftMessage3);
                }
                if (!this.state.isLeader()) {
                    return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
                }
                if (this.transport.now() - j > MACHINE_DOWN_TIMEOUT) {
                    return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.TIMEOUT)).build());
                }
                HashSet hashSet = new HashSet(this.state.log.getQuorumMembers());
                boolean z = !hashSet.remove(removeServerRequest.getOldServer());
                if (!z && !removeServerRequest.getQuorumList().isEmpty() && !hashSet.equals(new HashSet((Collection) removeServerRequest.getQuorumList()))) {
                    log.warn("{} - {} consistency error (forcing consistency);  adding={}  newMembership={}  presumedMembership={}", new Object[]{this.state.serverName, str, removeServerRequest.getOldServer(), hashSet, removeServerRequest.getQuorumList()});
                    hashSet = new HashSet((Collection) removeServerRequest.getQuorumList());
                    z = true;
                }
                RaftLogEntryLocation reconfigure = this.state.reconfigure(hashSet, z, j);
                log.info("{} - {} submitted new configuration to log: {}", new Object[]{this.state.serverName, str, hashSet});
                log.trace("{} - {}.3 waiting for commit", this.state.serverName, str);
                broadcastAppendEntries(this.transport.now(), false, true);
                return this.state.log.createCommitFuture(reconfigure.index, reconfigure.term, true).thenApply(bool -> {
                    if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                        throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                    }
                    log.trace("{} - {}.3 committed;  success={}", new Object[]{this.state.serverName, str, bool});
                    if (!bool.booleanValue()) {
                        return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OTHER_ERROR)).build();
                    }
                    log.info("{} - {} committed new configuration to log: {}", new Object[]{this.state.serverName, str, this.state.log.committedQuorumMembers});
                    this.state.lastMessageTimestamp.ifPresent(map -> {
                    });
                    broadcastAppendEntries(this.transport.now(), true, true);
                    return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.OK)).build();
                });
            });
            CompletableFuture<EloquentRaftProto.RaftMessage> completableFuture = new CompletableFuture<>();
            this.clusterMembershipFuture.handle((raftMessage4, th) -> {
                Logger logger = log;
                Object[] objArr = new Object[4];
                objArr[0] = this.state.serverName;
                objArr[1] = str;
                objArr[2] = raftMessage4 == null ? "<exception>" : raftMessage4.getAddServerReply().getStatus();
                objArr[3] = Boolean.valueOf(th != null);
                logger.trace("{} - {}.5 complete;  result={}  have_exception={}", objArr);
                if (th != null) {
                    log.warn("Exception on removeServer", th);
                }
                this.lastClusterMembershipChange = this.transport.now();
                if (raftMessage4 != null) {
                    if (removeServerRequest.getOldServer().equals(this.state.serverName) && raftMessage4.getRemoveServerReply().getStatus() == EloquentRaftProto.MembershipChangeStatus.OK) {
                        this.state.stepDownFromElection();
                    }
                    completableFuture.complete(raftMessage4);
                } else {
                    completableFuture.complete(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.TIMEOUT)).build());
                }
                return raftMessage4;
            });
            return completableFuture;
        }
        if (!((Boolean) this.state.leader.map(str2 -> {
            return Boolean.valueOf(!str2.equals(this.state.serverName));
        }).orElse(false)).booleanValue()) {
            log.info("{} - got {}; we're not the leader and don't know who the leader is", this.state.serverName, "RemoveServerRPC");
            return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
        }
        if (removeServerRequest.getForwardedByList().contains(this.state.serverName)) {
            log.info("{} - {}; we've been forwarded our own message back to us in a loop. Failing the message", this.state.serverName, "RemoveServerRPC");
            return CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build());
        }
        log.trace("{} - got {}; forwarding request to {}", new Object[]{this.state.serverName, "RemoveServerRPC", this.state.leader.orElse("<unknown>")});
        return this.transport.rpcTransportAsFuture(this.state.serverName, this.state.leader.orElse("<unknown>"), removeServerRequest.toBuilder().addForwardedBy(this.state.serverName).build(), (raftMessage5, th2) -> {
            if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
            }
            if (raftMessage5 != null) {
                return raftMessage5;
            }
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = this.state.serverName;
            objArr[1] = str;
            objArr[2] = th2 == null ? "unknown error" : th2.getClass().getName() + ": " + th2.getMessage();
            logger.warn("{} - {} Failure: <{}>", objArr);
            return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setRemoveServerReply(EloquentRaftProto.RemoveServerReply.newBuilder().setStatus(EloquentRaftProto.MembershipChangeStatus.NOT_LEADER)).build();
        }, this.drivingThreadQueue, electionTimeoutMillisRange().end + 100);
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public CompletableFuture<EloquentRaftProto.RaftMessage> receiveApplyTransitionRPC(EloquentRaftProto.ApplyTransitionRequest applyTransitionRequest, long j) {
        CompletableFuture<EloquentRaftProto.RaftMessage> completedFuture;
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        String str = "ReceiveApplyTransition";
        log.trace("{} - [{}] {}; is_leader={}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "ReceiveApplyTransition", Boolean.valueOf(this.state.isLeader())});
        if (this.state.isLeader()) {
            RaftLogEntryLocation transition = this.state.transition(applyTransitionRequest.getTransition().isEmpty() ? Optional.empty() : Optional.of(applyTransitionRequest.getTransition().toByteArray()), "".equals(applyTransitionRequest.getNewHospiceMember()) ? Optional.empty() : Optional.of(applyTransitionRequest.getNewHospiceMember()));
            completedFuture = this.state.log.createCommitFuture(transition.index, transition.term, true).handle((bool, th) -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                EloquentRaftProto.ApplyTransitionReply.Builder success = EloquentRaftProto.ApplyTransitionReply.newBuilder().setTerm(this.state.currentTerm).setNewEntryIndex(-1L).setSuccess(bool != null ? bool.booleanValue() : false);
                if (bool == null || !bool.booleanValue()) {
                    log.info("{} - {}; failed transition (on leader; could not commit) @ time={}", new Object[]{this.state.serverName, str, Long.valueOf(j), th});
                } else {
                    success.setNewEntryIndex(transition.index).setNewEntryTerm(transition.term);
                }
                return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setApplyTransitionReply(success).build();
            });
            broadcastAppendEntries(j, this.state.log.latestQuorumMembers.size() <= 1, false);
            updateLeaderInvariantsPostMessage(Optional.empty(), j);
        } else if (!((Boolean) this.state.leader.map(str2 -> {
            return Boolean.valueOf(!str2.equals(this.state.serverName));
        }).orElse(false)).booleanValue()) {
            log.info("{} - {}; we're not the leader and don't know who the leader is", this.state.serverName, "ReceiveApplyTransition");
            completedFuture = CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setApplyTransitionReply(EloquentRaftProto.ApplyTransitionReply.newBuilder().setTerm(this.state.currentTerm).setNewEntryIndex(-5L).setSuccess(false)).build());
        } else if (applyTransitionRequest.getForwardedByList().contains(this.state.serverName)) {
            log.info("{} - {}; we've been forwarded our own message back to us in a loop. Failing the message", this.state.serverName, "ReceiveApplyTransition");
            completedFuture = CompletableFuture.completedFuture(EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setApplyTransitionReply(EloquentRaftProto.ApplyTransitionReply.newBuilder().setTerm(this.state.currentTerm).setNewEntryIndex(-2L).setSuccess(false)).build());
        } else {
            if (!$assertionsDisabled && ((Boolean) this.state.leader.map(str3 -> {
                return Boolean.valueOf(str3.equals(serverName()));
            }).orElse(false)).booleanValue()) {
                throw new AssertionError("We are sending a message to the leader, but we think we're the leader!");
            }
            log.trace("{} - [{}] {}; forwarding request to {}", new Object[]{this.state.serverName, Long.valueOf(this.transport.now()), "ReceiveApplyTransition", this.state.leader.orElse("<unknown>")});
            completedFuture = this.transport.rpcTransportAsFuture(this.state.serverName, this.state.leader.orElse("<unknown>"), applyTransitionRequest.toBuilder().addForwardedBy(this.state.serverName).build(), (raftMessage, th2) -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                if (raftMessage != null) {
                    log.trace("{} - {}; received reply to forwarded message", this.state.serverName, str);
                    return raftMessage;
                }
                Logger logger = log;
                Object[] objArr = new Object[6];
                objArr[0] = this.state.serverName;
                objArr[1] = Long.valueOf(this.transport.now());
                objArr[2] = str;
                objArr[3] = th2 == null ? "unknown error" : th2.getClass().getName() + ": " + th2.getMessage();
                objArr[4] = Boolean.valueOf(this.state.isLeader());
                objArr[5] = this.state.leader.orElse("<unknown>");
                logger.warn("{} - [{}] {} Failure: <{}>;  is_leader={}  leader={}", objArr);
                return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setApplyTransitionReply(EloquentRaftProto.ApplyTransitionReply.newBuilder().setTerm(this.state.currentTerm).setNewEntryIndex(-3L).setSuccess(false)).build();
            }, this.drivingThreadQueue, electionTimeoutMillisRange().end).thenCompose(raftMessage2 -> {
                if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                    throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                }
                if (!raftMessage2.getApplyTransitionReply().getSuccess()) {
                    log.info("{} - {}; failed to apply transition (reply proto had failure marked); returning failure", this.state.serverName, str);
                    return CompletableFuture.completedFuture(raftMessage2);
                }
                log.trace("{} - {}; waiting for commit", this.state.serverName, str);
                long newEntryIndex = raftMessage2.getApplyTransitionReply().getNewEntryIndex();
                long newEntryTerm = raftMessage2.getApplyTransitionReply().getNewEntryTerm();
                return this.state.log.createCommitFuture(newEntryIndex, newEntryTerm, true).thenApply(bool2 -> {
                    if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                        throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                    }
                    log.trace("{} - {}; Entry @ {} (term={}) is committed in the local log", new Object[]{this.state.serverName, str, Long.valueOf(newEntryIndex), Long.valueOf(newEntryTerm)});
                    EloquentRaftProto.ApplyTransitionReply.Builder success = EloquentRaftProto.ApplyTransitionReply.newBuilder().setTerm(this.state.currentTerm).setNewEntryIndex(-4L).setSuccess(bool2.booleanValue());
                    if (bool2.booleanValue()) {
                        success.setNewEntryIndex(newEntryIndex).setNewEntryTerm(newEntryTerm);
                    } else {
                        log.info("{} - {}; failed transition (on follower) @ time={}", new Object[]{this.state.serverName, str, Long.valueOf(this.transport.now())});
                    }
                    return EloquentRaftProto.RaftMessage.newBuilder().setSender(this.state.serverName).setApplyTransitionReply(success).build();
                });
            });
        }
        return completedFuture;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void heartbeat(long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        long now = this.transport.now();
        Object startTimer = Prometheus.startTimer(summaryTiming, "heartbeat");
        try {
            if (!this.state.isLeader()) {
                log.trace("{} - heartbeat (follower) @ time={}  cluster={}", new Object[]{this.state.serverName, Long.valueOf(j), this.state.log.getQuorumMembers()});
                if (this.state.shouldTriggerElection(j, electionTimeoutMillisRange())) {
                    triggerElection(j);
                }
            } else {
                if (!$assertionsDisabled && !checkDuration("leadership check", now, this.transport.now())) {
                    throw new AssertionError();
                }
                long now2 = this.transport.now();
                log.trace("{} - heartbeat (leader) @ time={}  cluster={}", new Object[]{this.state.serverName, Long.valueOf(j), this.state.log.getQuorumMembers()});
                if (!$assertionsDisabled && !checkDuration("get quorum members", now, this.transport.now())) {
                    throw new AssertionError();
                }
                broadcastAppendEntries(j, false, true);
                if (Thread.interrupted()) {
                    throw new RuntimeInterruptedException();
                }
                if (!$assertionsDisabled && !checkDuration("broadcast", now2, this.transport.now())) {
                    throw new AssertionError();
                }
                if (this.state.targetClusterSize >= 0) {
                    long now3 = this.transport.now();
                    if (this.clusterMembershipFuture.getNow(null) != null) {
                        Optional<String> serverToRemove = this.state.serverToRemove(j, MACHINE_DOWN_TIMEOUT);
                        if (serverToRemove.isPresent()) {
                            log.info("{} - Detected Raft cluster is too large ({} > {}) or we have a delinquent server; scaling down by removing {} (latency {})  current_time={}", new Object[]{this.state.serverName, Integer.valueOf(this.state.log.getQuorumMembers().size()), Integer.valueOf(this.state.targetClusterSize), serverToRemove.get(), Long.valueOf(this.transport.now() - ((Long) serverToRemove.flatMap(str -> {
                                return this.state.lastMessageTimestamp.map(map -> {
                                    return (Long) map.get(str);
                                });
                            }).orElse(-1L)).longValue()), Long.valueOf(j)});
                            receiveRPC(RaftTransport.mkRaftRPC(this.state.serverName, EloquentRaftProto.RemoveServerRequest.newBuilder().setOldServer(serverToRemove.get()).build()), j);
                            if (!$assertionsDisabled && !checkDuration("remove server", now3, this.transport.now())) {
                                throw new AssertionError();
                            }
                        } else {
                            Optional<String> serverToAdd = this.state.serverToAdd(j, electionTimeoutMillisRange().begin);
                            if (serverToAdd.isPresent()) {
                                log.info("{} - Detected Raft cluster is too small ({} < {}); scaling up by adding {} (latency {})  current_time={}", new Object[]{this.state.serverName, Integer.valueOf(this.state.log.getQuorumMembers().size()), Integer.valueOf(this.state.targetClusterSize), serverToAdd.get(), Long.valueOf(this.transport.now() - ((Long) serverToAdd.flatMap(str2 -> {
                                    return this.state.lastMessageTimestamp.map(map -> {
                                        return (Long) map.get(str2);
                                    });
                                }).orElse(-1L)).longValue()), Long.valueOf(j)});
                                receiveRPC(RaftTransport.mkRaftRPC(this.state.serverName, EloquentRaftProto.AddServerRequest.newBuilder().setNewServer(serverToAdd.get()).m287build()), j);
                                if (!$assertionsDisabled && !checkDuration("add server", now3, this.transport.now())) {
                                    throw new AssertionError();
                                }
                            }
                        }
                    }
                    if (Thread.interrupted()) {
                        throw new RuntimeInterruptedException();
                    }
                    if (!$assertionsDisabled && !checkDuration("size checks", now3, this.transport.now())) {
                        throw new AssertionError();
                    }
                }
                long now4 = this.transport.now();
                if (this.state.log.stateMachine instanceof KeyValueStateMachine) {
                    Set<String> killNodes = this.state.killNodes(j, MACHINE_DOWN_TIMEOUT);
                    CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
                    for (String str3 : killNodes) {
                        completedFuture = completedFuture.thenCompose(raftMessage -> {
                            if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
                                throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
                            }
                            log.info("{} - Clearing transient data for node {}", this.state.serverName, str3);
                            return receiveApplyTransitionRPC(EloquentRaftProto.ApplyTransitionRequest.newBuilder().setTransition(ByteString.copyFrom(KeyValueStateMachine.createClearTransition(str3))).build(), j).handle((raftMessage, th) -> {
                                if (th == null && raftMessage != null && raftMessage.getApplyTransitionReply().getSuccess()) {
                                    return null;
                                }
                                this.state.revive(str3);
                                return null;
                            });
                        });
                    }
                }
                if (!$assertionsDisabled && !checkDuration("offline check", now4, this.transport.now())) {
                    throw new AssertionError();
                }
            }
            Prometheus.observeDuration(startTimer);
            if (!$assertionsDisabled && !checkDuration("total", now, this.transport.now())) {
                throw new AssertionError();
            }
        } catch (Throwable th) {
            Prometheus.observeDuration(startTimer);
            if (!$assertionsDisabled && !checkDuration("total", now, this.transport.now())) {
                throw new AssertionError();
            }
            throw th;
        }
    }

    private boolean checkDuration(String str, long j, long j2) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        long j3 = j2 - j;
        if (j3 <= 50) {
            return true;
        }
        long j4 = -1;
        try {
            Iterator it = ManagementFactory.getGarbageCollectorMXBeans().iterator();
            while (it.hasNext()) {
                GcInfo lastGcInfo = ((GarbageCollectorMXBean) it.next()).getLastGcInfo();
                if (lastGcInfo != null) {
                    j4 = lastGcInfo.getStartTime();
                }
            }
        } catch (Throwable th) {
            log.warn("Could not get GC info -- are you running on a non-Sun JVM?");
        }
        boolean z = false;
        if (j4 > j && j4 < j2) {
            z = true;
        }
        log.warn("{} - Raft took >50ms ({}) on step '{}';  leadership={}  system_load={}  interrupted_by_gc={}", new Object[]{this.state.serverName, TimerUtils.formatTimeDifference(j3), str, this.state.leadership, Double.valueOf(ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class).getSystemLoadAverage()), Boolean.valueOf(z)});
        return true;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public boolean bootstrap(boolean z) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (!z && (this.state.targetClusterSize < 0 || !this.state.log.getQuorumMembers().isEmpty())) {
            return false;
        }
        this.state.bootstrap(z);
        return true;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void stop(boolean z) {
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public boolean isRunning() {
        return true;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public void receiveBadRequest(EloquentRaftProto.RaftMessage raftMessage) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (raftMessage.getApplyTransitionReply() != EloquentRaftProto.ApplyTransitionReply.getDefaultInstance()) {
            log.warn("{} - Got an apply transition reply -- likely the dangling reply of a timeout somewhere", this.state.serverName);
        } else {
            log.warn("{} - Bad Raft message {}", this.state.serverName, raftMessage.toString());
        }
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public Optional<RaftLifecycle> lifecycle() {
        return this.lifecycle;
    }

    @Override // ai.eloquent.raft.RaftAlgorithm
    public RaftTransport getTransport() {
        return this.transport;
    }

    public List<String> errors() {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        ArrayList arrayList = new ArrayList();
        if (this.clusterMembershipFuture.getNow(null) == null) {
            arrayList.add("Membership change in progress (future is not empty). My state: " + this.state.leadership + "; leader=" + this.state.leader.orElse("<unknown>"));
        }
        if (((Integer) this.state.lastMessageTimestamp.map((v0) -> {
            return v0.size();
        }).orElse(0)).intValue() > 1000) {
            arrayList.add("Keeping track of >1000 heartbeat (" + this.state.lastMessageTimestamp.map((v0) -> {
                return v0.size();
            }).orElse(0) + ")");
        }
        if (this.state.electionTimeoutCheckpoint < 0) {
            arrayList.add("No election timeout present (perhaps means heartbeats are not propagating?): " + Instant.ofEpochMilli(this.state.electionTimeoutCheckpoint));
        }
        long now = this.transport.now() - Arrays.stream(this.lastBroadcastTimes).max().orElse(0L);
        if (this.state.isLeader() && now > heartbeatMillis() * 10) {
            arrayList.add("Last broadcast was " + TimerUtils.formatTimeDifference(now) + " ago");
        }
        return arrayList;
    }

    private boolean canPerformFollowerAction(String str, long j, boolean z) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (j < this.state.currentTerm && !z) {
            log.trace("{} - {} cannot perform action: remoteTerm={} < currentTerm={}", new Object[]{this.state.serverName, str, Long.valueOf(j), Long.valueOf(this.state.currentTerm)});
            return false;
        }
        if (j <= this.state.currentTerm) {
            return true;
        }
        log.trace("{} - {} detected remoteTerm={} > currentTerm={} -- forced into follower state but otherwise continuing", new Object[]{this.state.serverName, str, Long.valueOf(j), Long.valueOf(this.state.currentTerm)});
        this.state.stepDownFromElection();
        this.state.setCurrentTerm(j);
        if ($assertionsDisabled || this.state.leadership == RaftState.LeadershipStatus.OTHER) {
            return true;
        }
        throw new AssertionError();
    }

    private void asLeader(String str, long j, long j2, Runnable runnable) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (!canPerformLeaderAction(str, j)) {
            log.trace("{} - Leader prereqs failed.", this.state.serverName);
            return;
        }
        try {
            if (!$assertionsDisabled && !this.state.isLeader()) {
                throw new AssertionError("We should be a leader if we get this far performing a leader action");
            }
            runnable.run();
            updateLeaderInvariantsPostMessage(Optional.of(str), j2);
        } catch (Throwable th) {
            updateLeaderInvariantsPostMessage(Optional.of(str), j2);
            throw th;
        }
    }

    private boolean canPerformLeaderAction(String str, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (!this.state.isLeader()) {
            log.trace("{} - presumed leader is not a leader", this.state.serverName);
            return false;
        }
        if (str.isEmpty()) {
            log.warn("{} - Got a message with no follower name. This is an error!", this.state.serverName);
            return false;
        }
        if (j < this.state.currentTerm) {
            log.warn("{} - leader cannot perform action. remoteTerm={} < currentTerm={}", new Object[]{this.state.serverName, Long.valueOf(j), Long.valueOf(this.state.currentTerm)});
            return false;
        }
        if (j <= this.state.currentTerm) {
            return true;
        }
        log.trace("{} - detected remoteTerm={} > currentTerm={} -- forced into follower state", new Object[]{this.state.serverName, Long.valueOf(j), Long.valueOf(this.state.currentTerm)});
        this.state.stepDownFromElection();
        this.state.setCurrentTerm(j);
        if ($assertionsDisabled || this.state.leadership == RaftState.LeadershipStatus.OTHER) {
            return false;
        }
        throw new AssertionError();
    }

    private void updateLeaderInvariantsPostMessage(Optional<String> optional, long j) {
        if (!$assertionsDisabled && this.drivingThreadId >= 0 && this.drivingThreadId != Thread.currentThread().getId()) {
            throw new AssertionError("Eloquent Raft Algorithm should only be run from the driving thread (" + this.drivingThreadId + ") but is being driven by " + Thread.currentThread());
        }
        if (this.state.isLeader()) {
            optional.ifPresent(str -> {
                this.state.observeLifeFrom(str, j);
            });
            this.state.matchIndex.ifPresent(map -> {
                Set<String> quorumMembers = this.state.log.getQuorumMembers();
                if (quorumMembers.isEmpty()) {
                    return;
                }
                long[] array = quorumMembers.stream().mapToLong(str2 -> {
                    if (this.state.serverName.equals(str2)) {
                        return this.state.log.getLastEntryIndex();
                    }
                    if ($assertionsDisabled || map.containsKey(str2)) {
                        return ((Long) map.get(str2)).longValue();
                    }
                    throw new AssertionError("no match index for member " + str2);
                }).toArray();
                Arrays.sort(array);
                long j2 = array.length % 2 == 0 ? array[(array.length / 2) - 1] : array[array.length / 2];
                if (j2 <= this.state.commitIndex() || this.state.log.getLastEntryTerm() != this.state.currentTerm) {
                    return;
                }
                log.trace("{} - Committing up to index {}; matchIndex={} -> {}", new Object[]{this.state.serverName, Long.valueOf(j2), map, array});
                this.state.commitUpTo(j2, j);
                if (optional.isPresent()) {
                    broadcastAppendEntries(j, false, false);
                }
            });
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.state, ((EloquentRaftAlgorithm) obj).state);
    }

    public int hashCode() {
        return Objects.hash(this.state);
    }

    static {
        $assertionsDisabled = !EloquentRaftAlgorithm.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(EloquentRaftAlgorithm.class);
        LEAKY_BUCKET_SIZE = 10;
    }
}
