package ai.eloquent.data;

import ai.eloquent.data.UDPBroadcastProtos;
import ai.eloquent.io.IOUtils;
import ai.eloquent.monitoring.Prometheus;
import ai.eloquent.raft.RaftLifecycle;
import ai.eloquent.util.IdentityHashSet;
import ai.eloquent.util.Lazy;
import ai.eloquent.util.SafeTimerTask;
import ai.eloquent.util.StringUtils;
import ai.eloquent.util.SystemUtils;
import ai.eloquent.util.TimerUtils;
import ai.eloquent.util.ZipUtils;
import com.google.protobuf.ByteString;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/eloquent/data/UDPTransport.class */
public class UDPTransport implements Transport {
    private static final int DEFAULT_UDP_LISTEN_PORT = 42888;
    private static final int DEFAULT_TCP_LISTEN_PORT = 42888;
    private static final int MAX_UDP_PACKET_SIZE = 65000;
    public final InetAddress serverName;
    private final String serverAddress;
    public final boolean thread;
    public final boolean zip;
    private InetAddress[] broadcastAddrs;
    private final boolean isRealBroadcast;
    private long lastMessageReceived;
    private long lastMessageSent;
    private final int udpListenPort;
    private final int tcpListenPort;
    private DatagramSocket socket;
    private DatagramSocket serverSocket;
    private final Map<UDPBroadcastProtos.MessageType, IdentityHashSet<Consumer<byte[]>>> listeners;
    private final Map<byte[], Long> pingsReceived;
    public final int mtu;
    public final boolean allowJumboPackets;
    private final Queue<Runnable> sendQueue;
    Object summaryTimer;
    final Map<InetAddress, Socket> tcpSocketCache;
    private static final Logger log = LoggerFactory.getLogger(UDPTransport.class);
    private static final Pattern IP_REGEX = Pattern.compile("^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$");
    private static boolean networkSeemsDown = false;
    public static Lazy<Transport> DEFAULT = Lazy.of(() -> {
        try {
            return new UDPTransport();
        } catch (IOException e) {
            log.warn("UDPTransport already bound to address! Returning mock implementation");
            return new Transport() { // from class: ai.eloquent.data.UDPTransport.3
                @Override // ai.eloquent.data.Transport
                public void bind(UDPBroadcastProtos.MessageType messageType, Consumer<byte[]> consumer) {
                    UDPTransport.log.warn("UDP address is in use: cannot bind to transport");
                }

                @Override // ai.eloquent.data.Transport
                public boolean sendTransport(String str, UDPBroadcastProtos.MessageType messageType, byte[] bArr) {
                    UDPTransport.log.warn("UDP address is in use: cannot send on transport");
                    return false;
                }

                @Override // ai.eloquent.data.Transport
                public boolean broadcastTransport(UDPBroadcastProtos.MessageType messageType, byte[] bArr) {
                    UDPTransport.log.warn("UDP address is in use: cannot broadcast on transport");
                    return false;
                }
            };
        }
    });

    private UDPTransport(int i, int i2, boolean z, boolean z2, boolean z3) throws IOException {
        int mtu;
        this.lastMessageReceived = 0L;
        this.lastMessageSent = 0L;
        this.listeners = new HashMap();
        this.pingsReceived = new HashMap();
        this.sendQueue = new ArrayDeque();
        this.summaryTimer = Prometheus.summaryBuild("udp_transport", "Statistics on the UDP Transport calls", "operation");
        this.tcpSocketCache = new HashMap();
        this.udpListenPort = i;
        this.tcpListenPort = i2;
        this.serverName = InetAddress.getLocalHost();
        this.serverAddress = this.serverName.getHostAddress();
        this.socket = new DatagramSocket();
        this.socket.setSendBufferSize(MAX_UDP_PACKET_SIZE);
        this.socket.setReceiveBufferSize(MAX_UDP_PACKET_SIZE);
        this.serverSocket = new DatagramSocket(i);
        this.thread = z;
        this.zip = z2;
        this.allowJumboPackets = z3;
        InetAddress inetAddress = null;
        Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
        int i3 = MAX_UDP_PACKET_SIZE;
        while (networkInterfaces.hasMoreElements()) {
            NetworkInterface nextElement = networkInterfaces.nextElement();
            if (nextElement.isUp() && !nextElement.isLoopback()) {
                List<InterfaceAddress> interfaceAddresses = nextElement.getInterfaceAddresses();
                if (!interfaceAddresses.isEmpty() && (mtu = nextElement.getMTU() - 20) < i3) {
                    i3 = mtu;
                }
                for (InterfaceAddress interfaceAddress : interfaceAddresses) {
                    if (interfaceAddress.getBroadcast() != null) {
                        inetAddress = interfaceAddress.getBroadcast();
                        log.info("Found broadcast address: {}", interfaceAddress.getBroadcast());
                    }
                }
            }
        }
        this.mtu = i3;
        log.info("Setting MTU to {}", Integer.valueOf(this.mtu));
        if (inetAddress == null || inetAddress.getHostAddress().equals("0.0.0.0")) {
            log.info("Using Kubernetes state to broadcast");
            this.isRealBroadcast = false;
            this.broadcastAddrs = readKubernetesState();
            RaftLifecycle.global.timer.get().scheduleAtFixedRate(new SafeTimerTask() { // from class: ai.eloquent.data.UDPTransport.1
                @Override // ai.eloquent.util.SafeTimerTask
                public void runUnsafe() throws Throwable {
                    InetAddress[] inetAddressArr = UDPTransport.this.broadcastAddrs;
                    UDPTransport.this.broadcastAddrs = (InetAddress[]) Arrays.stream(UDPTransport.readKubernetesState()).filter(inetAddress2 -> {
                        try {
                            return inetAddress2.isReachable(100);
                        } catch (IOException e) {
                            return false;
                        }
                    }).toArray(i4 -> {
                        return new InetAddress[i4];
                    });
                    if (UDPTransport.this.broadcastAddrs.length == 0) {
                        UDPTransport.log.warn("Could not read Kubernetes configuration (no nodes present)");
                        UDPTransport.this.broadcastAddrs = inetAddressArr;
                    }
                    if (!Arrays.equals(inetAddressArr, UDPTransport.this.broadcastAddrs)) {
                        UDPTransport.log.info("detected change in online nodes: {}  (from {})", StringUtils.join(UDPTransport.this.broadcastAddrs, ","), StringUtils.join(inetAddressArr, ","));
                    }
                    for (InetAddress inetAddress3 : UDPTransport.this.broadcastAddrs) {
                        UDPTransport.this.getTcpSocket(inetAddress3, true);
                    }
                    UDPTransport.this.broadcastTransport(UDPBroadcastProtos.MessageType.PING, UDPTransport.this.serverName.getAddress());
                }
            }, Duration.ofSeconds(10L), Duration.ofSeconds(10L));
        } else {
            log.info("Using real broadcast address to broadcast");
            this.broadcastAddrs = new InetAddress[]{inetAddress};
            this.isRealBroadcast = true;
        }
        log.info("Broadcast addresses are [{}] (is_inferred={})", StringUtils.join(this.broadcastAddrs, ", "), Boolean.valueOf(inetAddress != null));
        if (this.isRealBroadcast) {
            this.socket.setBroadcast(true);
        }
        this.socket.setTrafficClass(16);
        log.info("Binding UDP to " + i);
        Thread thread = new Thread(() -> {
            Object startTimer;
            byte[] bArr;
            UDPBroadcastProtos.UDPPacket parseFrom;
            IdentityHashSet<Consumer<byte[]>> identityHashSet;
            try {
                byte[] bArr2 = new byte[MAX_UDP_PACKET_SIZE];
                DatagramPacket datagramPacket = new DatagramPacket(bArr2, bArr2.length);
                log.info("Started UDP listener thread");
                while (true) {
                    try {
                        try {
                            if (this.serverSocket.isClosed()) {
                                log.info("UDP socket was closed -- reopening");
                                this.serverSocket = new DatagramSocket(i);
                            }
                            this.serverSocket.receive(datagramPacket);
                            startTimer = Prometheus.startTimer(this.summaryTimer, "parse_upd_packet");
                            byte[] bArr3 = new byte[datagramPacket.getLength()];
                            System.arraycopy(datagramPacket.getData(), datagramPacket.getOffset(), bArr3, 0, bArr3.length);
                            if (this.zip) {
                                try {
                                    bArr = ZipUtils.gunzip(bArr3);
                                } catch (Throwable th) {
                                    bArr = bArr3;
                                }
                            } else {
                                bArr = bArr3;
                            }
                            parseFrom = UDPBroadcastProtos.UDPPacket.parseFrom(bArr);
                        } catch (Throwable th2) {
                            this.lastMessageReceived = System.currentTimeMillis();
                            throw th2;
                        }
                    } catch (IOException e) {
                        log.warn("IOException receiving packet from UDP: " + e.getClass().getSimpleName() + ": " + e.getMessage());
                        this.lastMessageReceived = System.currentTimeMillis();
                    } catch (Throwable th3) {
                        log.warn("Caught throwable in UDP RPC receive method with packet length " + datagramPacket.getLength() + " (at offset " + datagramPacket.getOffset() + " in the buffer of size " + datagramPacket.getData().length + ") and with source " + datagramPacket.getAddress() + ": ", th3);
                        this.lastMessageReceived = System.currentTimeMillis();
                    }
                    if (parseFrom == null) {
                        this.lastMessageReceived = System.currentTimeMillis();
                    } else {
                        Prometheus.observeDuration(startTimer);
                        if (parseFrom.getType() == UDPBroadcastProtos.MessageType.PING) {
                            this.pingsReceived.put(parseFrom.getContents().toByteArray(), Long.valueOf(System.currentTimeMillis()));
                        } else if (!parseFrom.getIsBroadcast() || !parseFrom.getSender().equals(this.serverAddress)) {
                            synchronized (this.listeners) {
                                identityHashSet = this.listeners.get(parseFrom.getType());
                            }
                            if (identityHashSet != null) {
                                Iterator<Consumer<byte[]>> it = identityHashSet.iterator();
                                while (it.hasNext()) {
                                    Consumer<byte[]> next = it.next();
                                    doAction(z, "inbound message of type " + parseFrom.getType(), () -> {
                                        next.accept(parseFrom.getContents().toByteArray());
                                    });
                                }
                            }
                        }
                        this.lastMessageReceived = System.currentTimeMillis();
                    }
                }
            } catch (Throwable th4) {
                log.error("Why did we shut down the transport listener thread?");
                throw th4;
            }
        });
        thread.setDaemon(true);
        thread.setName("udp-listener");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            log.warn("Uncaught exception on thread {}: ", thread2, th);
        });
        thread.setPriority(9);
        thread.start();
        log.info("Binding TCP to " + i2);
        Thread thread3 = new Thread(() -> {
            try {
                try {
                    ServerSocket serverSocket = new ServerSocket(i2);
                    log.info("Started TCP listener thread");
                    while (true) {
                        try {
                            try {
                                Socket accept = serverSocket.accept();
                                Thread thread4 = new Thread(() -> {
                                    UDPBroadcastProtos.UDPPacket parseDelimitedFrom;
                                    IdentityHashSet identityHashSet;
                                    while (!accept.isClosed() && (parseDelimitedFrom = UDPBroadcastProtos.UDPPacket.parseDelimitedFrom(accept.getInputStream())) != null) {
                                        try {
                                            try {
                                                Object startTimer = Prometheus.startTimer(this.summaryTimer, "parse_tcp_packet");
                                                synchronized (this.listeners) {
                                                    identityHashSet = new IdentityHashSet(this.listeners.get(parseDelimitedFrom.getType()));
                                                }
                                                byte[] byteArray = parseDelimitedFrom.getContents().toByteArray();
                                                Prometheus.observeDuration(startTimer);
                                                Iterator it = identityHashSet.iterator();
                                                while (it.hasNext()) {
                                                    Consumer consumer = (Consumer) it.next();
                                                    doAction(z, "inbound TCP message of type " + parseDelimitedFrom.getType(), () -> {
                                                        consumer.accept(byteArray);
                                                    });
                                                }
                                            } finally {
                                                this.lastMessageReceived = System.currentTimeMillis();
                                                try {
                                                    accept.close();
                                                } catch (IOException e) {
                                                    log.warn("Could not close TCP socket: ", e);
                                                }
                                            }
                                        } catch (IOException e2) {
                                            log.warn("IOException receiving packet from TCP: " + e2.getClass().getSimpleName() + ": " + e2.getMessage());
                                            this.lastMessageReceived = System.currentTimeMillis();
                                            try {
                                                accept.close();
                                                return;
                                            } catch (IOException e3) {
                                                log.warn("Could not close TCP socket: ", e3);
                                                return;
                                            }
                                        } catch (Throwable th2) {
                                            log.warn("Caught throwable in TCP RPC receive method: ", th2);
                                            this.lastMessageReceived = System.currentTimeMillis();
                                            try {
                                                accept.close();
                                                return;
                                            } catch (IOException e4) {
                                                log.warn("Could not close TCP socket: ", e4);
                                                return;
                                            }
                                        }
                                    }
                                });
                                thread4.setDaemon(true);
                                thread4.setName("tcp-listener-" + accept.getInetAddress().getHostName());
                                thread4.setUncaughtExceptionHandler((thread5, th2) -> {
                                    log.warn("Uncaught exception on {}: ", thread5, th2);
                                });
                                thread4.setPriority(Math.max(5, 9));
                                thread4.start();
                                this.lastMessageReceived = System.currentTimeMillis();
                            } finally {
                            }
                        } catch (IOException e) {
                            log.warn("IOException receiving new socket on TCP: " + e.getClass().getSimpleName() + ": " + e.getMessage());
                            this.lastMessageReceived = System.currentTimeMillis();
                        } catch (Throwable th3) {
                            log.warn("Caught throwable in TCP receive method: ", th3);
                            this.lastMessageReceived = System.currentTimeMillis();
                        }
                    }
                } catch (Throwable th4) {
                    log.error("Could not establish TCP socket -- this is an error!");
                    log.error("Why did we shut down the transport listener thread?");
                }
            } catch (Throwable th5) {
                log.error("Why did we shut down the transport listener thread?");
                throw th5;
            }
        });
        thread3.setDaemon(true);
        thread3.setName("tcp-listener");
        thread3.setUncaughtExceptionHandler((thread4, th2) -> {
            log.warn("Uncaught exception on thread {}: ", thread4, th2);
        });
        thread3.setPriority(9);
        thread3.start();
        log.info("Starting sender thread" + i2);
        final AtomicLong atomicLong = new AtomicLong(-1L);
        final Thread thread5 = new Thread(() -> {
            while (true) {
                try {
                    synchronized (this.sendQueue) {
                        while (this.sendQueue.isEmpty()) {
                            try {
                                this.sendQueue.wait(100L);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                    Runnable poll = this.sendQueue.poll();
                    if (poll != null) {
                        try {
                            atomicLong.set(System.currentTimeMillis());
                            poll.run();
                            atomicLong.set(-1L);
                        } catch (Exception e2) {
                            log.warn("Could not send message: {}: {}", e2.getClass().getSimpleName(), e2.getMessage());
                        }
                    }
                } catch (Throwable th3) {
                    log.warn("Caught exception sending message on transport: ", th3);
                }
            }
        });
        thread5.setDaemon(true);
        thread5.setName("transport-sender");
        thread5.setUncaughtExceptionHandler((thread6, th3) -> {
            log.warn("Uncaught exception on thread {}: ", thread6, th3);
        });
        thread5.setPriority(9);
        thread5.start();
        RaftLifecycle.global.timer.get().scheduleAtFixedRate(new SafeTimerTask() { // from class: ai.eloquent.data.UDPTransport.2
            @Override // ai.eloquent.util.SafeTimerTask
            public void runUnsafe() {
                if (System.currentTimeMillis() - atomicLong.get() > 100) {
                    thread5.interrupt();
                }
            }
        }, Duration.ofMillis(100L));
    }

    public UDPTransport() throws IOException {
        this(42888, 42888, false, false, false);
    }

    public static InetAddress[] readKubernetesState() throws IOException {
        String str = System.getenv("ELOQUENT_RAFT_MEMBERS");
        if (str == null || "".equals(str)) {
            throw new FileNotFoundException("Could not find Kubernetes file variable ELOQUENT_RAFT_MEMBERS");
        }
        return (InetAddress[]) Arrays.stream(IOUtils.slurpReader(IOUtils.readerFromString(str)).split("\n")).filter(str2 -> {
            return str2.trim().length() > 0;
        }).map(str3 -> {
            return str3.split("\\t|\\s{2,}");
        }).filter(strArr -> {
            return strArr.length > 0 && IP_REGEX.matcher(strArr[0]).matches();
        }).map(strArr2 -> {
            try {
                return InetAddress.getByName(strArr2[0]);
            } catch (UnknownHostException e) {
                log.warn("Unknown host: " + strArr2[0]);
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).toArray(i -> {
            return new InetAddress[i];
        });
    }

    @Override // ai.eloquent.data.Transport
    public void bind(UDPBroadcastProtos.MessageType messageType, Consumer<byte[]> consumer) {
        synchronized (this.listeners) {
            this.listeners.computeIfAbsent(messageType, messageType2 -> {
                return new IdentityHashSet();
            }).add(consumer);
        }
    }

    @Override // ai.eloquent.data.Transport
    public boolean sendTransport(String str, UDPBroadcastProtos.MessageType messageType, byte[] bArr) {
        try {
            try {
                try {
                    UDPBroadcastProtos.UDPPacket m185build = UDPBroadcastProtos.UDPPacket.newBuilder().setIsBroadcast(false).setType(messageType).setSender(this.serverAddress).setContents(ByteString.copyFrom(bArr)).m185build();
                    byte[] gzip = this.zip ? ZipUtils.gzip(m185build.toByteArray()) : m185build.toByteArray();
                    if ((!this.allowJumboPackets || gzip.length >= MAX_UDP_PACKET_SIZE) && (this.allowJumboPackets || gzip.length >= this.mtu)) {
                        log.debug("Message is too long to send as a single packet ({}); sending over TCP", Integer.valueOf(gzip.length));
                        getTcpSocket(InetAddress.getByName(str), false).ifPresent(socket -> {
                            safeWrite(m185build, socket, "sendTransport");
                        });
                        this.lastMessageSent = System.currentTimeMillis();
                        return false;
                    }
                    DatagramPacket datagramPacket = new DatagramPacket(gzip, gzip.length, InetAddress.getByName(str), this.udpListenPort);
                    if (this.socket.isClosed()) {
                        synchronized (this) {
                            if (this.socket.isClosed()) {
                                log.info("UDP socket was closed (in send call) -- reopening");
                                this.socket = new DatagramSocket();
                                this.socket.setSendBufferSize(MAX_UDP_PACKET_SIZE);
                                if (this.isRealBroadcast) {
                                    this.socket.setBroadcast(true);
                                }
                                this.socket.setTrafficClass(16);
                            }
                        }
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        this.socket.send(datagramPacket);
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if (currentTimeMillis2 > currentTimeMillis + 10) {
                            log.warn("Sending a direct message on the UDP socket took {}", TimerUtils.formatTimeDifference(currentTimeMillis2 - currentTimeMillis));
                        }
                        if (networkSeemsDown) {
                            log.info("Network is back up");
                        }
                        networkSeemsDown = false;
                        this.lastMessageSent = System.currentTimeMillis();
                        return true;
                    } catch (Throwable th) {
                        long currentTimeMillis3 = System.currentTimeMillis();
                        if (currentTimeMillis3 > currentTimeMillis + 10) {
                            log.warn("Sending a direct message on the UDP socket took {}", TimerUtils.formatTimeDifference(currentTimeMillis3 - currentTimeMillis));
                        }
                        throw th;
                    }
                } catch (UnknownHostException e) {
                    log.warn("No such destination: ", e);
                    this.lastMessageSent = System.currentTimeMillis();
                    return false;
                }
            } catch (SocketException e2) {
                log.warn("Could not create datagram socket: ", e2);
                this.lastMessageSent = System.currentTimeMillis();
                return false;
            } catch (IOException e3) {
                if (e3.getMessage() == null || !(e3.getMessage().equals("Network is unreachable") || e3.getMessage().equals("Network is down"))) {
                    log.warn("Could not send message on datagram socket: ", e3);
                } else {
                    if (!networkSeemsDown) {
                        log.warn("Network seems to have disconnected! UDPTransport is not sending messages");
                    }
                    networkSeemsDown = true;
                }
                this.lastMessageSent = System.currentTimeMillis();
                return false;
            }
        } catch (Throwable th2) {
            this.lastMessageSent = System.currentTimeMillis();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<Socket> getTcpSocket(InetAddress inetAddress, boolean z) {
        synchronized (this.tcpSocketCache) {
            Socket socket = this.tcpSocketCache.get(inetAddress);
            Socket socket2 = socket;
            if (socket == null && !z && this.tcpSocketCache.containsKey(inetAddress)) {
                return Optional.empty();
            }
            if (socket2 == null || socket2.isClosed()) {
                try {
                    if (!inetAddress.isReachable(100)) {
                        synchronized (this.tcpSocketCache) {
                            this.tcpSocketCache.put(inetAddress, null);
                        }
                        log.warn("{} is reachable: ", inetAddress);
                        return Optional.empty();
                    }
                    CompletableFuture completableFuture = new CompletableFuture();
                    Thread thread = new Thread(() -> {
                        try {
                            Socket socket3 = new Socket(inetAddress, this.tcpListenPort);
                            synchronized (completableFuture) {
                                completableFuture.complete(socket3);
                            }
                        } catch (IOException e) {
                            completableFuture.completeExceptionally(e);
                        }
                    });
                    thread.setName("socket-creator-" + inetAddress.getHostAddress());
                    thread.setDaemon(true);
                    thread.setUncaughtExceptionHandler((thread2, th) -> {
                        log.warn("Uncaught exception on {}: ", thread2, th);
                    });
                    thread.start();
                    try {
                        try {
                            socket2 = (Socket) completableFuture.get(5L, TimeUnit.SECONDS);
                            synchronized (this.tcpSocketCache) {
                                Socket socket3 = this.tcpSocketCache.get(inetAddress);
                                if (socket3 == null || socket3.isClosed()) {
                                    this.tcpSocketCache.put(inetAddress, socket2);
                                } else {
                                    try {
                                        socket2.close();
                                    } catch (IOException e) {
                                    }
                                }
                            }
                            log.info("refreshed TCP socket for {}", inetAddress);
                            if (thread.isAlive()) {
                                thread.interrupt();
                            }
                        } catch (Throwable th2) {
                            if (thread.isAlive()) {
                                thread.interrupt();
                            }
                            throw th2;
                        }
                    } catch (InterruptedException | ExecutionException | TimeoutException e2) {
                        log.warn("Could not create TCP socket to {} -- exception on future: ", inetAddress, e2);
                        Optional<Socket> empty = Optional.empty();
                        if (thread.isAlive()) {
                            thread.interrupt();
                        }
                        return empty;
                    }
                } catch (IOException e3) {
                    log.warn("Could not check if {} is reachable: ", inetAddress, e3);
                    synchronized (this.tcpSocketCache) {
                        this.tcpSocketCache.put(inetAddress, null);
                        return Optional.empty();
                    }
                }
            }
            return Optional.of(socket2);
        }
    }

    @Override // ai.eloquent.data.Transport
    public boolean broadcastTransport(UDPBroadcastProtos.MessageType messageType, byte[] bArr) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            UDPBroadcastProtos.UDPPacket m185build = UDPBroadcastProtos.UDPPacket.newBuilder().setIsBroadcast(true).setType(messageType).setSender(this.serverAddress).setContents(ByteString.copyFrom(bArr)).m185build();
            byte[] gzip = this.zip ? ZipUtils.gzip(m185build.toByteArray()) : m185build.toByteArray();
            boolean z = true;
            if ((!this.allowJumboPackets || gzip.length >= MAX_UDP_PACKET_SIZE) && (this.allowJumboPackets || gzip.length >= this.mtu)) {
                if (this.isRealBroadcast) {
                    log.debug("Message is too long to send as a single packet ({}), and we only have a broadcast address", Integer.valueOf(gzip.length));
                } else {
                    log.debug("Message is too long to send as a single packet ({}) -- broadcasting over TCP", Integer.valueOf(gzip.length));
                    for (InetAddress inetAddress : this.broadcastAddrs) {
                        try {
                            getTcpSocket(inetAddress, false).ifPresent(socket -> {
                                safeWrite(m185build, socket, "broadcastTransport");
                            });
                        } catch (Throwable th) {
                            log.warn("Unhandled exception sending message on TCP socket", th);
                        }
                    }
                }
                this.lastMessageSent = System.currentTimeMillis();
                log.trace("Sending UDP broadcast took {}", TimerUtils.formatTimeSince(currentTimeMillis));
                return false;
            }
            for (InetAddress inetAddress2 : this.broadcastAddrs) {
                long currentTimeMillis2 = System.currentTimeMillis();
                try {
                    try {
                        if (!inetAddress2.equals(this.serverName)) {
                            DatagramPacket datagramPacket = new DatagramPacket(gzip, gzip.length, inetAddress2, this.udpListenPort);
                            if (this.socket.isClosed()) {
                                synchronized (this) {
                                    if (this.socket.isClosed()) {
                                        log.info("UDP socket was closed (in broadcast call) -- reopening");
                                        this.socket = new DatagramSocket();
                                        this.socket.setSendBufferSize(MAX_UDP_PACKET_SIZE);
                                        if (this.isRealBroadcast) {
                                            this.socket.setBroadcast(true);
                                        }
                                        this.socket.setTrafficClass(16);
                                    }
                                }
                            }
                            long currentTimeMillis3 = System.currentTimeMillis();
                            try {
                                this.socket.send(datagramPacket);
                                long currentTimeMillis4 = System.currentTimeMillis();
                                if (currentTimeMillis4 > currentTimeMillis3 + 10) {
                                    log.warn("Sending a broadcast to {} on the UDP socket took {} for packet of length {}", new Object[]{inetAddress2, TimerUtils.formatTimeDifference(currentTimeMillis4 - currentTimeMillis3), Integer.valueOf(datagramPacket.getLength())});
                                }
                                if (networkSeemsDown) {
                                    log.info("Network is back up");
                                }
                                networkSeemsDown = false;
                            } catch (Throwable th2) {
                                long currentTimeMillis5 = System.currentTimeMillis();
                                if (currentTimeMillis5 > currentTimeMillis3 + 10) {
                                    log.warn("Sending a broadcast to {} on the UDP socket took {} for packet of length {}", new Object[]{inetAddress2, TimerUtils.formatTimeDifference(currentTimeMillis5 - currentTimeMillis3), Integer.valueOf(datagramPacket.getLength())});
                                }
                                throw th2;
                            }
                        }
                        long currentTimeMillis6 = System.currentTimeMillis();
                        if (currentTimeMillis6 > currentTimeMillis2 + 10) {
                            log.warn("Sending broadcast to {} on transport took {}", inetAddress2, TimerUtils.formatTimeDifference(currentTimeMillis6 - currentTimeMillis2));
                        }
                    } catch (Throwable th3) {
                        long currentTimeMillis7 = System.currentTimeMillis();
                        if (currentTimeMillis7 > currentTimeMillis2 + 10) {
                            log.warn("Sending broadcast to {} on transport took {}", inetAddress2, TimerUtils.formatTimeDifference(currentTimeMillis7 - currentTimeMillis2));
                        }
                        throw th3;
                    }
                } catch (Throwable th4) {
                    z = false;
                    if (th4.getMessage() == null || !(th4.getMessage().equals("Network is unreachable") || th4.getMessage().equals("Network is down"))) {
                        log.warn("Could not broadcast to " + inetAddress2 + " (attempt took " + TimerUtils.formatTimeSince(currentTimeMillis2) + ") -- still trying other addressed", th4);
                    } else {
                        if (!networkSeemsDown) {
                            log.warn("Network seems to have disconnected! UDPTransport is not sending messages");
                        }
                        networkSeemsDown = true;
                    }
                    long currentTimeMillis8 = System.currentTimeMillis();
                    if (currentTimeMillis8 > currentTimeMillis2 + 10) {
                        log.warn("Sending broadcast to {} on transport took {}", inetAddress2, TimerUtils.formatTimeDifference(currentTimeMillis8 - currentTimeMillis2));
                    }
                }
            }
            return z;
        } finally {
        }
        this.lastMessageSent = System.currentTimeMillis();
        log.trace("Sending UDP broadcast took {}", TimerUtils.formatTimeSince(currentTimeMillis));
    }

    private void safeWrite(UDPBroadcastProtos.UDPPacket uDPPacket, Socket socket, String str) {
        synchronized (this.sendQueue) {
            if (this.sendQueue.size() < 10000) {
                this.sendQueue.offer(() -> {
                    synchronized (socket) {
                        try {
                            uDPPacket.writeDelimitedTo(socket.getOutputStream());
                        } catch (IOException e) {
                            log.info("IO or Socket exception writing to TCP socket ({}): {}", str, e.getMessage());
                            try {
                                socket.close();
                            } catch (Throwable th) {
                            }
                        } catch (Throwable th2) {
                            log.warn("Unknown exception writing to TCP socket (" + str + "): ", th2);
                        }
                    }
                });
                this.sendQueue.notify();
            }
        }
    }

    private Set<String> liveServers(long j) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<byte[], Long> entry : this.pingsReceived.entrySet()) {
            if (Math.abs(entry.getValue().longValue() - System.currentTimeMillis()) < j) {
                ArrayList arrayList = new ArrayList();
                for (byte b : entry.getKey()) {
                    arrayList.add(Integer.toString((b + 256) % 256));
                }
                hashSet.add(StringUtils.join(arrayList, "."));
            }
        }
        return hashSet;
    }

    public String toString() {
        return "UDP@" + SystemUtils.HOST + " (" + this.serverAddress + ")  <last message sent " + TimerUtils.formatTimeSince(this.lastMessageSent) + " ago; received " + TimerUtils.formatTimeSince(this.lastMessageReceived) + " ago>  <broadcasting to " + Arrays.asList(this.broadcastAddrs) + ">  <pings from " + StringUtils.join(liveServers(60000L), ", ") + ">";
    }

    public static void main(String[] strArr) throws IOException {
        new UDPTransport();
    }
}
