package dragon.network;

import dragon.ComponentError;
import dragon.Config;
import dragon.DragonInvalidStateException;
import dragon.DragonRequiresClonableException;
import dragon.LocalCluster;
import dragon.metrics.ComponentMetricMap;
import dragon.metrics.Metrics;
import dragon.metrics.Sample;
import dragon.network.comms.DragonCommsException;
import dragon.network.comms.IComms;
import dragon.network.comms.TcpComms;
import dragon.network.messages.node.context.ContextUpdateNMsg;
import dragon.network.messages.node.fault.NodeFaultNMsg;
import dragon.network.messages.node.fault.RipNMsg;
import dragon.network.messages.node.fault.TopoFaultNMsg;
import dragon.network.operations.DragonInvalidContext;
import dragon.network.operations.GroupOp;
import dragon.network.operations.ListToposGroupOp;
import dragon.network.operations.Ops;
import dragon.network.operations.TermTopoGroupOp;
import dragon.process.ProcessManager;
import dragon.topology.DragonTopology;
import dragon.topology.base.Component;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dragon/network/Node.class */
public class Node {
    private static final Logger log = LogManager.getLogger(Node.class);
    private final IComms comms;
    private final HashMap<String, LocalCluster> localClusters;
    private final ServiceMsgProcessor serviceThread;
    private final NodeMsgProcessor nodeThread;
    private final Ops operationsThread;
    private final Config conf;
    private final Metrics metricsThread;
    private final Router router;
    NodeDescriptor parentDesc;
    private NodeState nodeState;
    private HashMap<NodeDescriptor, Process> childProcesses;
    private ProcessManager processManager;
    private final ReentrantLock operationsLock = new ReentrantLock();
    private final Timer timer;
    public HashMap<String, ClassLoader> pluginLoaders;
    public HashMap<String, HashSet<String>> pluginClasses;
    private static Node me;

    /* loaded from: input_file:dragon/network/Node$NodeState.class */
    public enum NodeState {
        JOINING,
        JOIN_REQUESTED,
        ACCEPTING_JOIN,
        OPERATIONAL,
        TERMINATING
    }

    public static Node inst() {
        return me;
    }

    public Node(Config config) throws IOException {
        me = this;
        this.conf = config;
        this.parentDesc = config.getDragonNetworkParentDescriptor();
        writePid();
        this.pluginLoaders = new HashMap<>();
        this.pluginClasses = new HashMap<>();
        this.localClusters = new HashMap<>();
        this.childProcesses = new HashMap<>();
        this.timer = new Timer("timer");
        this.processManager = new ProcessManager(config);
        this.operationsThread = new Ops();
        setNodeState(NodeState.OPERATIONAL);
        this.comms = new TcpComms(config);
        this.comms.open();
        this.nodeThread = new NodeMsgProcessor();
        this.router = new Router(config, this.comms, this.localClusters);
        this.serviceThread = new ServiceMsgProcessor();
        if (config.getDragonMetricsEnabled()) {
            this.metricsThread = new Metrics(config, this.localClusters, this.comms.getMyNodeDesc());
        } else {
            this.metricsThread = null;
        }
        ArrayList<NodeDescriptor> hosts = config.getHosts();
        if (hosts.size() <= 0 || hosts.get(0).equals(this.comms.getMyNodeDesc())) {
            return;
        }
        sendInitialContextUpdate(hosts);
    }

    private void sendInitialContextUpdate(ArrayList<NodeDescriptor> arrayList) {
        if (arrayList.isEmpty()) {
            return;
        }
        NodeDescriptor remove = arrayList.remove(0);
        while (true) {
            NodeDescriptor nodeDescriptor = remove;
            if (!this.nodeThread.getAliveContext().containsKey(nodeDescriptor.toString())) {
                Ops.inst().newOp(op -> {
                    try {
                        inst().getComms().sendNodeMsg(nodeDescriptor, new ContextUpdateNMsg(this.nodeThread.getAliveContext()));
                    } catch (DragonCommsException e) {
                        this.nodeThread.setDead(nodeDescriptor);
                        op.fail("[" + nodeDescriptor + "] is not reachable");
                    }
                }, op2 -> {
                    op2.success();
                }, op3 -> {
                    log.info("sent context update to [" + nodeDescriptor + "]");
                    sendInitialContextUpdate(arrayList);
                }, (op4, str) -> {
                    log.warn(str);
                    sendInitialContextUpdate(arrayList);
                });
                return;
            } else if (arrayList.isEmpty()) {
                return;
            } else {
                remove = arrayList.remove(0);
            }
        }
    }

    private void writePid() throws IOException {
        Long valueOf = Long.valueOf(ProcessHandle.current().pid());
        log.debug("pid = " + valueOf);
        log.debug("writing pid to [" + this.conf.getDragonDataDir() + "/dragon-" + this.conf.getDragonNetworkLocalDataPort() + ".pid]");
        File file = new File(this.conf.getDragonDataDir() + "/dragon-" + this.conf.getDragonNetworkLocalDataPort() + ".pid");
        new File(this.conf.getDragonDataDir()).mkdirs();
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file)));
        bufferedWriter.write(valueOf.toString());
        bufferedWriter.newLine();
        bufferedWriter.close();
        if (ProcessHandle.current().parent().isPresent()) {
            log.debug("parent pid = " + ((ProcessHandle) ProcessHandle.current().parent().get()).pid());
        }
    }

    public synchronized IComms getComms() {
        return this.comms;
    }

    public synchronized ProcessManager getProcessManager() {
        return this.processManager;
    }

    public synchronized HashMap<String, LocalCluster> getLocalClusters() {
        return this.localClusters;
    }

    public synchronized NodeState getNodeState() {
        return this.nodeState;
    }

    public synchronized void setNodeState(NodeState nodeState) {
        log.info("state is now [" + nodeState + "]");
        this.nodeState = nodeState;
    }

    public synchronized ReentrantLock getOperationsLock() {
        return this.operationsLock;
    }

    public synchronized NodeMsgProcessor getNodeProcessor() {
        return this.nodeThread;
    }

    public synchronized Router getRouter() {
        return this.router;
    }

    public synchronized Config getConf() {
        return this.conf;
    }

    public synchronized Ops getOpsProcessor() {
        return this.operationsThread;
    }

    public synchronized boolean storeJarFile(String str, byte[] bArr) {
        Path path = Paths.get(this.conf.getJarPath() + "/" + this.comms.getMyNodeDesc(), str);
        new File(path.getParent().toString()).mkdirs();
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(path.toString());
            try {
                fileOutputStream.write(bArr);
                fileOutputStream.close();
                return true;
            } finally {
            }
        } catch (IOException e) {
            e.printStackTrace();
            log.fatal("failed to store topology jar file for [" + str + "]");
            return false;
        }
    }

    public synchronized boolean loadJarFile(String str) {
        Path path = Paths.get(this.conf.getJarPath() + "/" + this.comms.getMyNodeDesc(), str);
        ClassLoader classLoader = Node.class.getClassLoader();
        JarInputStream jarInputStream = null;
        try {
            try {
                log.info("creating class loader for: " + str);
                this.pluginClasses.put(str, new HashSet<>());
                this.pluginLoaders.put(str, URLClassLoader.newInstance(new URL[]{path.toUri().toURL()}, classLoader));
                jarInputStream = new JarInputStream(new FileInputStream(path.toString()));
                while (true) {
                    JarEntry nextJarEntry = jarInputStream.getNextJarEntry();
                    if (nextJarEntry == null) {
                        break;
                    }
                    if (nextJarEntry.getName().endsWith(".class")) {
                        String replaceAll = nextJarEntry.getName().replaceAll("/", "\\.");
                        String substring = replaceAll.substring(0, replaceAll.lastIndexOf(46));
                        try {
                            this.pluginLoaders.get(str).loadClass(substring);
                            this.pluginClasses.get(str).add(substring);
                        } catch (ClassNotFoundException e) {
                            log.warn("class not found: " + substring);
                        } catch (NoClassDefFoundError e2) {
                            log.warn("no class def: " + substring);
                        }
                    }
                }
                try {
                    jarInputStream.close();
                } catch (IOException e3) {
                    e3.printStackTrace();
                }
                return true;
            } catch (Throwable th) {
                try {
                    jarInputStream.close();
                } catch (IOException e4) {
                    e4.printStackTrace();
                }
                throw th;
            }
        } catch (MalformedURLException e5) {
            e5.printStackTrace();
            try {
                jarInputStream.close();
            } catch (IOException e6) {
                e6.printStackTrace();
            }
            log.fatal("failed to add topology jar file to the classpath [" + str + "]");
            return false;
        } catch (IOException e7) {
            e7.printStackTrace();
            try {
                jarInputStream.close();
            } catch (IOException e8) {
                e8.printStackTrace();
            }
            log.fatal("failed to add topology jar file to the classpath [" + str + "]");
            return false;
        }
    }

    public synchronized byte[] readJarFile(String str) {
        try {
            return Files.readAllBytes(new File(Paths.get(this.conf.getJarPath() + "/" + this.comms.getMyNodeDesc(), str).toString()).toPath());
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    public synchronized void prepareTopology(String str, Config config, DragonTopology dragonTopology, boolean z) throws DragonRequiresClonableException, DragonTopologyException {
        if (this.localClusters.containsKey(str)) {
            throw new DragonTopologyException("topology already exists: " + str);
        }
        LocalCluster localCluster = new LocalCluster(this);
        Config config2 = new Config();
        config2.putAll(this.conf);
        config2.putAll(config);
        localCluster.submitTopology(str, config2, dragonTopology, z);
        getRouter().submitTopology(str, dragonTopology);
        getLocalClusters().put(str, localCluster);
    }

    public synchronized void startTopology(String str) throws DragonTopologyException, DragonInvalidStateException {
        if (!this.localClusters.containsKey(str)) {
            throw new DragonTopologyException("topology does not exist: " + str);
        }
        this.localClusters.get(str).openAll();
    }

    public synchronized void terminateTopology(String str, GroupOp groupOp) throws DragonTopologyException, DragonInvalidStateException {
        if (!this.localClusters.containsKey(str)) {
            throw new DragonTopologyException("topology does not exist: " + str);
        }
        LocalCluster localCluster = getLocalClusters().get(str);
        localCluster.setGroupOperation(groupOp);
        localCluster.setShouldTerminate();
    }

    public synchronized ComponentMetricMap getMetrics(String str) {
        if (this.metricsThread != null) {
            return this.metricsThread.getMetrics(str);
        }
        return null;
    }

    public synchronized void localClusterTerminated(TermTopoGroupOp termTopoGroupOp) {
        termTopoGroupOp.sendSuccess();
    }

    public synchronized void removeTopo(String str, boolean z) throws DragonTopologyException {
        if (!this.localClusters.containsKey(str)) {
            if (!z) {
                throw new DragonTopologyException("topology does not exist [" + str + "]");
            }
            return;
        }
        if (z) {
            log.warn("purging topology [" + str + "]");
            this.localClusters.get(str).closeAll();
            this.localClusters.get(str).interruptAll();
        }
        this.router.terminateTopology(str, this.localClusters.get(str).getTopology());
        this.localClusters.remove(str);
        this.pluginLoaders.put(str, null);
        this.pluginClasses.get(str).clear();
        System.gc();
    }

    public synchronized void signalHaltTopology(String str) {
        try {
            this.operationsThread.newHaltTopoGroupOp(str, op -> {
                log.warn("halting topology, waiting up to [" + (getConf().getDragonServiceTimeoutMs() / 1000) + "] seconds...");
            }, op2 -> {
                log.warn("topology was halted due to too many errors");
            }, (op3, str2) -> {
                try {
                    topologyFault(str);
                } catch (DragonTopologyException e) {
                    e.printStackTrace();
                    log.error(e);
                }
            }).onRunning(op4 -> {
                try {
                    haltTopology(str);
                } catch (DragonInvalidStateException | DragonTopologyException e) {
                    op4.fail(e.getMessage());
                }
            }).onTimeout(getTimer(), getConf().getDragonServiceTimeoutMs(), TimeUnit.MILLISECONDS, op5 -> {
                op5.fail("timed out waiting for nodes to respond");
            });
        } catch (DragonInvalidContext e) {
            try {
                topologyFault(str);
            } catch (DragonTopologyException e2) {
                e2.printStackTrace();
                log.error(e2);
            }
        }
    }

    public synchronized void haltTopology(String str) throws DragonTopologyException, DragonInvalidStateException {
        if (!this.localClusters.containsKey(str)) {
            throw new DragonTopologyException("topology does not exist: " + str);
        }
        this.localClusters.get(str).haltTopology();
    }

    public synchronized void listTopologies(ListToposGroupOp listToposGroupOp) {
        HashMap<String, String> hashMap = new HashMap<>();
        HashMap<String, List<String>> hashMap2 = new HashMap<>();
        HashMap<String, HashMap<String, Sample>> hashMap3 = new HashMap<>();
        HashMap<String, HashMap<String, ArrayList<ComponentError>>> hashMap4 = new HashMap<>();
        for (String str : this.localClusters.keySet()) {
            hashMap.put(str, this.localClusters.get(str).getState().name());
            hashMap2.put(str, new ArrayList());
            hashMap3.put(str, new HashMap<>());
            for (String str2 : this.localClusters.get(str).getSpouts().keySet()) {
                for (Integer num : this.localClusters.get(str).getSpouts().get(str2).keySet()) {
                    hashMap2.get(str).add(str2 + ":" + num);
                    hashMap3.get(str).put(str2 + ":" + num, new Sample(this.localClusters.get(str).getSpouts().get(str2).get(num)));
                }
            }
            for (String str3 : this.localClusters.get(str).getBolts().keySet()) {
                for (Integer num2 : this.localClusters.get(str).getBolts().get(str3).keySet()) {
                    hashMap2.get(str).add(str3 + ":" + num2);
                    hashMap3.get(str).put(str3 + ":" + num2, new Sample(this.localClusters.get(str).getBolts().get(str3).get(num2)));
                }
            }
            hashMap4.put(str, new HashMap<>());
            for (Component component : this.localClusters.get(str).getComponentErrors().keySet()) {
                hashMap4.get(str).put(component.getComponentId() + ":" + component.getTaskId(), this.localClusters.get(str).getComponentErrors().get(component));
            }
        }
        listToposGroupOp.metrics = hashMap3;
        listToposGroupOp.components = hashMap2;
        listToposGroupOp.state = hashMap;
        listToposGroupOp.errors = hashMap4;
    }

    public synchronized void resumeTopology(String str) throws DragonTopologyException, DragonInvalidStateException {
        if (!this.localClusters.containsKey(str)) {
            throw new DragonTopologyException("topology does not exist: " + str);
        }
        this.localClusters.get(str).resumeTopology();
    }

    public synchronized int allocatePartition(String str, int i) {
        log.debug("allocating [" + i + "] partitions [" + str + "]");
        int size = this.childProcesses.keySet().size();
        for (int i2 = 0; i2 < i; i2++) {
            Config config = new Config();
            config.putAll(this.conf);
            config.put(Config.DRAGON_NETWORK_PARTITION, str);
            config.put(Config.DRAGON_NETWORK_PRIMARY, false);
            config.put(Config.DRAGON_NETWORK_LOCAL_SERVICE_PORT, Integer.valueOf(this.conf.getDragonNetworkLocalServicePort() + ((size + 1) * 2)));
            config.put(Config.DRAGON_NETWORK_LOCAL_DATA_PORT, Integer.valueOf(this.conf.getDragonNetworkLocalDataPort() + ((size + 1) * 2)));
            config.put(Config.DRAGON_NETWORK_PARENT, this.comms.getMyNodeDesc().toMap());
            try {
                writeConf(config, config.getDragonHomeDir() + "/conf/dragon-" + config.getDragonNetworkLocalDataPort() + ".yaml");
                NodeDescriptor localHost = config.getLocalHost();
                ProcessBuilder createDaemon = ProcessManager.createDaemon(config);
                this.processManager.startProcess(createDaemon, false, process -> {
                    this.childProcesses.put(localHost, process);
                    try {
                        process.getInputStream().close();
                    } catch (IOException e) {
                        log.warn("could not close the input stream to the process");
                    }
                }, processBuilder -> {
                    log.error("process failed to start: " + createDaemon.toString());
                }, process2 -> {
                    log.warn("process has exited: " + process2.exitValue());
                });
                size++;
            } catch (IOException e) {
                return i2;
            }
        }
        return i;
    }

    public synchronized int deallocatePartition(String str, int i) {
        int i2 = 0;
        HashSet hashSet = new HashSet();
        for (NodeDescriptor nodeDescriptor : this.childProcesses.keySet()) {
            if (i == 0) {
                break;
            }
            if (nodeDescriptor.getPartition().equals(str)) {
                hashSet.add(nodeDescriptor);
                i2++;
                i--;
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            this.childProcesses.remove((NodeDescriptor) it.next());
        }
        return i2;
    }

    public static void writeConf(Config config, String str) throws IOException {
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(str))));
        bufferedWriter.write(config.toYamlStringNice());
        bufferedWriter.newLine();
        bufferedWriter.close();
    }

    public synchronized NodeStatus getStatus() {
        NodeStatus nodeStatus = new NodeStatus();
        nodeStatus.desc = this.comms.getMyNodeDesc();
        nodeStatus.timestamp = Instant.now().toEpochMilli();
        nodeStatus.state = this.nodeState;
        for (String str : this.localClusters.keySet()) {
            nodeStatus.localClusterStates.put(str, this.localClusters.get(str).getState());
        }
        nodeStatus.partitionId = this.conf.getDragonNetworkPartition();
        nodeStatus.primary = this.conf.getDragonNetworkPrimary();
        nodeStatus.parent = this.parentDesc;
        return nodeStatus;
    }

    public synchronized void terminate() {
        log.info("going down...");
        this.comms.close();
        setNodeState(NodeState.TERMINATING);
        if (!this.localClusters.isEmpty()) {
            log.error("topologies are still allocated");
            this.localClusters.forEach((str, localCluster) -> {
                localCluster.interruptAll();
            });
        }
        this.metricsThread.interrupt();
        this.timer.cancel();
        this.router.terminate();
        this.processManager.interrupt();
        this.operationsThread.interrupt();
        this.serviceThread.interrupt();
        this.nodeThread.interrupt();
    }

    public synchronized Timer getTimer() {
        return this.timer;
    }

    public synchronized void nodeFault(NodeDescriptor nodeDescriptor) {
        log.error("[" + nodeDescriptor + "] is not reachable");
        removeNode(nodeDescriptor);
        HashSet hashSet = new HashSet();
        for (NodeDescriptor nodeDescriptor2 : this.nodeThread.getAliveContext().values()) {
            if (!nodeDescriptor2.equals(this.comms.getMyNodeDesc())) {
                try {
                    this.comms.sendNodeMsg(nodeDescriptor2, new NodeFaultNMsg(nodeDescriptor));
                } catch (DragonCommsException e) {
                    hashSet.add(nodeDescriptor2);
                }
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            nodeFault((NodeDescriptor) it.next());
        }
    }

    public synchronized void removeNode(NodeDescriptor nodeDescriptor) {
        if (this.nodeThread.getAliveContext().containsKey(nodeDescriptor.toString())) {
            log.debug("removing [" + nodeDescriptor + "] from the node context");
            this.nodeThread.setDead(nodeDescriptor);
            Ops.inst().newOp(op -> {
                try {
                    inst().getComms().sendNodeMsg(nodeDescriptor, new RipNMsg());
                } catch (DragonCommsException e) {
                    op.fail("[" + nodeDescriptor + "] is really dead");
                }
            }, op2 -> {
                op2.success();
            }, op3 -> {
                log.info("sent RIP to [" + nodeDescriptor + "]");
            }, (op4, str) -> {
                log.warn(str);
            });
            for (String str2 : this.localClusters.keySet()) {
                if (this.localClusters.get(str2).getTopology().getReverseEmbedding().containsKey(nodeDescriptor)) {
                    try {
                        topologyFault(str2);
                    } catch (DragonTopologyException e) {
                        e.printStackTrace();
                        log.error(e.getMessage());
                    }
                }
            }
        }
    }

    public synchronized void topologyFault(String str) throws DragonTopologyException {
        if (!this.localClusters.containsKey(str)) {
            throw new DragonTopologyException("topology does not exist [" + str + "]");
        }
        if (this.localClusters.get(str).getState() == LocalCluster.State.FAULT) {
            return;
        }
        log.error("topology [" + str + "] has faulted");
        for (NodeDescriptor nodeDescriptor : this.localClusters.get(str).getTopology().getReverseEmbedding().keySet()) {
            if (!this.nodeThread.getAliveContext().containsKey(nodeDescriptor.toString()) || nodeDescriptor.equals(this.comms.getMyNodeDesc())) {
                this.localClusters.get(str).setFault();
            } else {
                Ops.inst().newOp(op -> {
                    try {
                        this.comms.sendNodeMsg(nodeDescriptor, new TopoFaultNMsg(str));
                    } catch (DragonCommsException e) {
                        nodeFault(nodeDescriptor);
                        op.fail("topology [" + str + "] may not be faulted correctly");
                    }
                }, op2 -> {
                    op2.success();
                }, op3 -> {
                    log.info("sent topology fault to [" + nodeDescriptor + "]");
                }, (op4, str2) -> {
                    log.warn(str2);
                });
            }
        }
    }

    public synchronized void setTopologyFault(String str) throws DragonTopologyException {
        if (!this.localClusters.containsKey(str)) {
            throw new DragonTopologyException("topology does not exist [" + str + "]");
        }
        this.localClusters.get(str).setFault();
    }

    public synchronized void topologyFault(String str, DragonTopology dragonTopology) {
        log.error("topology [" + str + "] has faulted");
        for (NodeDescriptor nodeDescriptor : dragonTopology.getReverseEmbedding().keySet()) {
            if (!this.nodeThread.getAliveContext().containsKey(nodeDescriptor.toString()) || nodeDescriptor.equals(this.comms.getMyNodeDesc())) {
                this.localClusters.get(str).setFault();
            } else {
                Ops.inst().newOp(op -> {
                    try {
                        this.comms.sendNodeMsg(nodeDescriptor, new TopoFaultNMsg(str));
                    } catch (DragonCommsException e) {
                        nodeFault(nodeDescriptor);
                        op.fail("topology [" + str + "] may not be faulted correctly");
                    }
                }, op2 -> {
                    op2.success();
                }, op3 -> {
                    log.info("sent topology fault to [" + nodeDescriptor + "]");
                }, (op4, str2) -> {
                    log.warn(str2);
                });
            }
        }
    }

    public void rip() {
        log.warn("received RIP :-(");
        new ArrayList(this.localClusters.keySet()).forEach(str -> {
            try {
                removeTopo(str, true);
            } catch (DragonTopologyException e) {
                log.error("topology was removed elsewhere");
            }
        });
        this.nodeThread.setAllDead();
    }
}
