package ai.eloquent.raft;

import ai.eloquent.data.UDPBroadcastProtos;
import ai.eloquent.data.UDPTransport;
import ai.eloquent.raft.EloquentRaftProto;
import ai.eloquent.raft.RaftGrpc;
import ai.eloquent.util.ConcurrencyUtils;
import ai.eloquent.util.IdentityHashSet;
import ai.eloquent.util.SafeTimerTask;
import ai.eloquent.util.Span;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/eloquent/raft/NetRaftTransport.class */
public class NetRaftTransport implements RaftTransport {
    private static final Logger log;
    private static final int DEFAULT_RPC_LISTEN_PORT = 42889;
    public final String serverName;
    public final boolean thread;
    private final Set<RaftAlgorithm> boundAlgorithms;
    private final int rpcListenPort;
    private final Map<String, ManagedChannel> channel;
    static final /* synthetic */ boolean $assertionsDisabled;

    public NetRaftTransport(String str, int i, final boolean z) throws IOException {
        this.boundAlgorithms = new IdentityHashSet();
        this.channel = new HashMap();
        this.serverName = str;
        this.rpcListenPort = i;
        this.thread = z;
        if (!str.matches("[12]?[0-9]?[0-9]\\.[12]?[0-9]?[0-9]\\.[12]?[0-9]?[0-9]\\.[12]?[0-9]?[0-9](_.*)?")) {
            throw new IllegalArgumentException("Invalid server name \"" + str + "\". Server name must start with an IPv4 address, followed by an optional underscore and custom descriptor. For example, \"127.0.0.1_foobar\".");
        }
        ServerBuilder.forPort(i).addService(new RaftGrpc.RaftImplBase() { // from class: ai.eloquent.raft.NetRaftTransport.1
            @Override // ai.eloquent.raft.RaftGrpc.RaftImplBase
            public void rpc(EloquentRaftProto.RaftMessage raftMessage, StreamObserver<EloquentRaftProto.RaftMessage> streamObserver) {
                NetRaftTransport.log.trace("Got an RPC request");
                try {
                    UDPTransport.DEFAULT.get().doAction(z, "handle inbound RPC", () -> {
                        Iterator it = NetRaftTransport.this.boundAlgorithms.iterator();
                        while (it.hasNext()) {
                            ((RaftAlgorithm) it.next()).receiveRPC(raftMessage, NetRaftTransport.this.now()).whenComplete((raftMessage2, th) -> {
                                if (th != null || raftMessage2 == null) {
                                    streamObserver.onError(th == null ? new RuntimeException() : th);
                                } else {
                                    streamObserver.onNext(raftMessage2);
                                    streamObserver.onCompleted();
                                }
                            });
                        }
                    });
                } catch (Throwable th) {
                    streamObserver.onError(th);
                }
            }
        }).build().start();
    }

    public NetRaftTransport(String str) throws IOException {
        this(str, DEFAULT_RPC_LISTEN_PORT, false);
    }

    @Override // ai.eloquent.raft.RaftTransport
    public void scheduleAtFixedRate(SafeTimerTask safeTimerTask, long j) {
        RaftLifecycle.global.timer.get().scheduleAtFixedRate(safeTimerTask, 0L, j);
    }

    @Override // ai.eloquent.raft.RaftTransport
    public void schedule(SafeTimerTask safeTimerTask, long j) {
        RaftLifecycle.global.timer.get().schedule(safeTimerTask, j);
    }

    @Override // ai.eloquent.raft.RaftTransport
    public synchronized void bind(RaftAlgorithm raftAlgorithm) {
        if (this.boundAlgorithms.contains(raftAlgorithm)) {
            return;
        }
        UDPTransport.DEFAULT.get().bind(UDPBroadcastProtos.MessageType.RAFT, bArr -> {
            log.trace("Received a UDP message");
            try {
                EloquentRaftProto.RaftMessage parseFrom = EloquentRaftProto.RaftMessage.parseFrom(bArr);
                if (!parseFrom.getSender().equals(this.serverName)) {
                    if (!$assertionsDisabled && !ConcurrencyUtils.ensureNoLocksHeld()) {
                        throw new AssertionError();
                    }
                    raftAlgorithm.receiveMessage(parseFrom, raftMessage -> {
                        sendTransport(raftAlgorithm.serverName(), parseFrom.getSender(), raftMessage);
                    }, now());
                }
            } catch (InvalidProtocolBufferException e) {
                log.warn("Not a Raft message: ", e);
            }
        });
        this.boundAlgorithms.add(raftAlgorithm);
    }

    @Override // ai.eloquent.raft.RaftTransport
    public Collection<RaftAlgorithm> boundAlgorithms() {
        return this.boundAlgorithms;
    }

    @Override // ai.eloquent.raft.RaftTransport
    public void rpcTransport(String str, String str2, EloquentRaftProto.RaftMessage raftMessage, final Consumer<EloquentRaftProto.RaftMessage> consumer, final Runnable runnable, long j) {
        ManagedChannel computeIfAbsent;
        if (!$assertionsDisabled && !ConcurrencyUtils.ensureNoLocksHeld()) {
            throw new AssertionError();
        }
        int indexOf = str2.indexOf("_");
        String substring = indexOf > 0 ? str2.substring(0, indexOf) : str2;
        synchronized (this) {
            Iterator it = new HashSet(this.channel.entrySet()).iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (((ManagedChannel) entry.getValue()).isTerminated() || ((ManagedChannel) entry.getValue()).isShutdown()) {
                    ((ManagedChannel) entry.getValue()).shutdown();
                    this.channel.remove(entry.getKey());
                }
            }
            String str3 = substring;
            computeIfAbsent = this.channel.computeIfAbsent(str2, str4 -> {
                return ManagedChannelBuilder.forAddress(str3, this.rpcListenPort).usePlaintext().build();
            });
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        log.trace("Sending RPC request to {} @ ip {}", str2, substring);
        final String str5 = substring;
        ((RaftGrpc.RaftStub) RaftGrpc.newStub(computeIfAbsent).withDeadlineAfter(j, TimeUnit.MILLISECONDS)).rpc(raftMessage, new StreamObserver<EloquentRaftProto.RaftMessage>() { // from class: ai.eloquent.raft.NetRaftTransport.2
            static final /* synthetic */ boolean $assertionsDisabled;

            public void onNext(EloquentRaftProto.RaftMessage raftMessage2) {
                NetRaftTransport.log.trace("Got an RPC response from {}", str5);
                if (atomicBoolean.getAndSet(false)) {
                    if (!$assertionsDisabled && !ConcurrencyUtils.ensureNoLocksHeld()) {
                        throw new AssertionError();
                    }
                    consumer.accept(raftMessage2);
                }
            }

            public void onError(Throwable th) {
                if (atomicBoolean.getAndSet(false)) {
                    if (!$assertionsDisabled && !ConcurrencyUtils.ensureNoLocksHeld()) {
                        throw new AssertionError();
                    }
                    runnable.run();
                }
            }

            public void onCompleted() {
                if (atomicBoolean.getAndSet(false)) {
                    if (!$assertionsDisabled && !ConcurrencyUtils.ensureNoLocksHeld()) {
                        throw new AssertionError();
                    }
                    runnable.run();
                }
            }

            static {
                $assertionsDisabled = !NetRaftTransport.class.desiredAssertionStatus();
            }
        });
    }

    @Override // ai.eloquent.raft.RaftTransport
    public void sendTransport(String str, String str2, EloquentRaftProto.RaftMessage raftMessage) {
        log.trace("Sending a UDP message to {}", str2);
        if (!$assertionsDisabled && !ConcurrencyUtils.ensureNoLocksHeld()) {
            throw new AssertionError();
        }
        int indexOf = str2.indexOf("_");
        UDPTransport.DEFAULT.get().sendTransport(indexOf > 0 ? str2.substring(0, indexOf) : str2, UDPBroadcastProtos.MessageType.RAFT, raftMessage.toByteArray());
    }

    @Override // ai.eloquent.raft.RaftTransport
    public void broadcastTransport(String str, EloquentRaftProto.RaftMessage raftMessage) {
        log.trace("Broadcasting a UDP message");
        if (!$assertionsDisabled && !ConcurrencyUtils.ensureNoLocksHeld()) {
            throw new AssertionError();
        }
        UDPTransport.DEFAULT.get().broadcastTransport(UDPBroadcastProtos.MessageType.RAFT, raftMessage.toByteArray());
    }

    @Override // ai.eloquent.raft.RaftTransport
    public Span expectedNetworkDelay() {
        return new Span(10L, 100L);
    }

    public String toString() {
        return "Raft:" + UDPTransport.DEFAULT.get().toString();
    }

    static {
        $assertionsDisabled = !NetRaftTransport.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(NetRaftTransport.class);
    }
}
