package dragon.network.comms;

import com.influxdb.client.domain.RunLinks;
import dragon.Config;
import dragon.network.NodeDescriptor;
import dragon.network.messages.node.NodeMessage;
import dragon.network.messages.service.ServiceDoneSMsg;
import dragon.network.messages.service.ServiceMessage;
import dragon.tuple.NetworkTask;
import dragon.utils.CircularBlockingQueue;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dragon/network/comms/TcpComms.class */
public class TcpComms implements IComms {
    private static Logger log = LogManager.getLogger((Class<?>) TcpComms.class);
    private Socket serviceSocketClient;
    private ServerSocket serviceSocketServer;
    private HashMap<String, ObjectOutputStream> serviceOutputStreams;
    private ObjectOutputStream serviceOutputStream;
    private Config conf;
    private SocketManager socketManager;
    private int resetMax;
    private Thread serviceThread;
    private Thread nodeThread;
    private Thread taskThread;
    private NodeDescriptor me;
    private Long id = 0L;
    private int resetCount = 0;
    private LinkedBlockingQueue<ServiceMessage> incomingServiceQueue = new LinkedBlockingQueue<>();
    private LinkedBlockingQueue<NodeMessage> incomingNodeQueue = new LinkedBlockingQueue<>();
    private CircularBlockingQueue<NetworkTask> incomingTaskQueue = new CircularBlockingQueue<>(1024);
    private HashSet<Thread> nodeInputsThreads = new HashSet<>();
    private HashSet<Thread> taskInputsThreads = new HashSet<>();

    public TcpComms(Config config) {
        this.resetMax = 1;
        this.conf = config;
        this.resetMax = config.getDragonCommsResetCount();
    }

    @Override // dragon.network.comms.IComms
    public void open(NodeDescriptor nodeDescriptor) throws IOException {
        log.debug("opening a service socket to [" + nodeDescriptor + "]");
        this.serviceSocketClient = new Socket(nodeDescriptor.getHost(), nodeDescriptor.getServicePort());
        this.serviceOutputStream = new ObjectOutputStream(this.serviceSocketClient.getOutputStream());
        final ObjectInputStream objectInputStream = new ObjectInputStream(this.serviceSocketClient.getInputStream());
        this.serviceThread = new Thread() { // from class: dragon.network.comms.TcpComms.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    ServiceMessage serviceMessage = (ServiceMessage) objectInputStream.readObject();
                    while (serviceMessage.getType() != ServiceMessage.ServiceMessageType.SERVICE_DONE) {
                        TcpComms.this.incomingServiceQueue.put(serviceMessage);
                        serviceMessage = (ServiceMessage) objectInputStream.readObject();
                    }
                    objectInputStream.close();
                    TcpComms.this.serviceOutputStream.close();
                    TcpComms.this.serviceSocketClient.close();
                } catch (IOException | ClassNotFoundException e) {
                    TcpComms.log.debug("class not found or ioexception: " + e.toString());
                } catch (InterruptedException e2) {
                    TcpComms.log.debug("interrupted");
                }
                TcpComms.log.debug("service done");
            }
        };
        this.serviceThread.setName("service");
        this.serviceThread.start();
    }

    private Long nextId() {
        Long l = this.id;
        this.id = Long.valueOf(this.id.longValue() + 1);
        return this.id;
    }

    @Override // dragon.network.comms.IComms
    public void open() throws IOException {
        this.me = this.conf.getLocalHost();
        log.info("this Dragon node is [" + this.me + "]");
        this.serviceSocketServer = new ServerSocket(this.me.getServicePort());
        this.socketManager = new SocketManager(this.me.getDataPort(), this.me);
        this.serviceOutputStreams = new HashMap<>();
        this.serviceThread = new Thread() { // from class: dragon.network.comms.TcpComms.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!isInterrupted()) {
                    try {
                        TcpComms.log.debug("accepting service messages on port [" + TcpComms.this.serviceSocketServer.getLocalPort() + "]");
                        final Socket accept = TcpComms.this.serviceSocketServer.accept();
                        TcpComms.this.id = Long.valueOf(TcpComms.this.id.longValue() + 1);
                        Thread thread = new Thread() { // from class: dragon.network.comms.TcpComms.2.1
                            Long myid;

                            {
                                this.myid = TcpComms.this.nextId();
                            }

                            @Override // java.lang.Thread, java.lang.Runnable
                            public void run() {
                                try {
                                    synchronized (TcpComms.this.serviceOutputStreams) {
                                        TcpComms.this.serviceOutputStreams.put(this.myid.toString(), new ObjectOutputStream(accept.getOutputStream()));
                                    }
                                    ObjectInputStream objectInputStream = new ObjectInputStream(accept.getInputStream());
                                    for (ServiceMessage serviceMessage = (ServiceMessage) objectInputStream.readObject(); serviceMessage.getType() != ServiceMessage.ServiceMessageType.SERVICE_DONE; serviceMessage = (ServiceMessage) objectInputStream.readObject()) {
                                        serviceMessage.setMessageId(this.myid.toString());
                                        TcpComms.this.incomingServiceQueue.put(serviceMessage);
                                    }
                                    ServiceDoneSMsg serviceDoneSMsg = new ServiceDoneSMsg();
                                    serviceDoneSMsg.setMessageId(this.myid.toString());
                                    try {
                                        TcpComms.this.sendServiceMsg(serviceDoneSMsg);
                                    } catch (DragonCommsException e) {
                                        TcpComms.log.error(e.getMessage());
                                    }
                                    synchronized (TcpComms.this.serviceOutputStreams) {
                                        TcpComms.this.serviceOutputStreams.get(this.myid.toString()).close();
                                        TcpComms.this.serviceOutputStreams.remove(this.myid.toString());
                                    }
                                    objectInputStream.close();
                                    accept.close();
                                } catch (IOException e2) {
                                    TcpComms.log.error("exception with service socket: " + e2.toString());
                                } catch (ClassNotFoundException e3) {
                                    TcpComms.log.error("something other than a ServiceMessage was received: " + e3.toString());
                                } catch (InterruptedException e4) {
                                    TcpComms.log.warn("interrupted while putting on incomming service queue: " + e4.toString());
                                }
                            }
                        };
                        thread.setName("servlet");
                        thread.start();
                    } catch (IOException e) {
                        TcpComms.log.error("exception with service socket: " + e.toString());
                    }
                }
            }
        };
        this.serviceThread.setName("service");
        this.serviceThread.start();
        this.nodeThread = new Thread() { // from class: dragon.network.comms.TcpComms.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!isInterrupted()) {
                    try {
                        final NodeDescriptor waitingInputs = TcpComms.this.socketManager.getWaitingInputs("node");
                        Thread thread = new Thread() { // from class: dragon.network.comms.TcpComms.3.1
                            @Override // java.lang.Thread, java.lang.Runnable
                            public void run() {
                                while (!isInterrupted()) {
                                    try {
                                        TcpComms.this.incomingNodeQueue.put((NodeMessage) TcpComms.this.socketManager.getInputStream("node", waitingInputs).readObject());
                                    } catch (IOException e) {
                                        TcpComms.log.error("ioexception on node stream from [" + waitingInputs + "]: " + e.toString());
                                        TcpComms.this.socketManager.delete("node", waitingInputs);
                                        return;
                                    } catch (ClassNotFoundException e2) {
                                        TcpComms.log.error("incorrect class transmitted on node stream from +[" + waitingInputs + "]");
                                        TcpComms.this.socketManager.close("node", waitingInputs);
                                        return;
                                    } catch (InterruptedException e3) {
                                        TcpComms.log.warn("interrupted while reading node stream from [" + waitingInputs + "]");
                                        TcpComms.this.socketManager.close("node", waitingInputs);
                                    }
                                }
                            }
                        };
                        thread.setName("node input " + TcpComms.this.nodeInputsThreads.size());
                        TcpComms.this.nodeInputsThreads.add(thread);
                        thread.start();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        this.nodeThread.setName("node");
        this.nodeThread.start();
        this.taskThread = new Thread() { // from class: dragon.network.comms.TcpComms.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!isInterrupted()) {
                    try {
                        final NodeDescriptor waitingInputs = TcpComms.this.socketManager.getWaitingInputs(RunLinks.SERIALIZED_NAME_TASK);
                        Thread thread = new Thread() { // from class: dragon.network.comms.TcpComms.4.1
                            @Override // java.lang.Thread, java.lang.Runnable
                            public void run() {
                                ObjectInputStream inputStream = TcpComms.this.socketManager.getInputStream(RunLinks.SERIALIZED_NAME_TASK, waitingInputs);
                                while (!isInterrupted()) {
                                    try {
                                        TcpComms.this.incomingTaskQueue.put(NetworkTask.readFromStream(inputStream));
                                    } catch (IOException e) {
                                        TcpComms.log.error("ioexception on task stream from +[" + waitingInputs + "]: " + e.toString());
                                        TcpComms.this.socketManager.delete(RunLinks.SERIALIZED_NAME_TASK, waitingInputs);
                                        return;
                                    } catch (ClassNotFoundException e2) {
                                        TcpComms.log.error("incorrect class transmitted on task stream from +[" + waitingInputs + "]");
                                        TcpComms.this.socketManager.close(RunLinks.SERIALIZED_NAME_TASK, waitingInputs);
                                        return;
                                    } catch (InterruptedException e3) {
                                        TcpComms.log.warn("interrupted while reading node stream from +[" + waitingInputs + "]");
                                        TcpComms.this.socketManager.close("node", waitingInputs);
                                    }
                                }
                            }
                        };
                        thread.setName("task input " + TcpComms.this.taskInputsThreads.size());
                        TcpComms.this.taskInputsThreads.add(thread);
                        thread.start();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        this.taskThread.setName(RunLinks.SERIALIZED_NAME_TASK);
        this.taskThread.start();
    }

    @Override // dragon.network.comms.IComms
    public void close() {
        this.serviceThread.interrupt();
        if (this.serviceSocketServer != null) {
            try {
                this.serviceSocketServer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // dragon.network.comms.IComms
    public NodeDescriptor getMyNodeDesc() {
        return this.me;
    }

    @Override // dragon.network.comms.IComms
    public void sendServiceMsg(ServiceMessage serviceMessage) throws DragonCommsException {
        try {
            log.debug("sending service message [" + serviceMessage.getType().name() + "]");
            if (serviceMessage.getMessageId().equals("")) {
                this.serviceOutputStream.writeObject(serviceMessage);
                this.serviceOutputStream.flush();
                return;
            }
            synchronized (this.serviceOutputStreams) {
                this.serviceOutputStreams.get(serviceMessage.getMessageId()).writeObject(serviceMessage);
                this.serviceOutputStreams.get(serviceMessage.getMessageId()).flush();
                this.serviceOutputStreams.get(serviceMessage.getMessageId()).reset();
            }
        } catch (IOException e) {
            log.error("service data was not transmitted");
            throw new DragonCommsException("service data can not be transmitted");
        }
    }

    @Override // dragon.network.comms.IComms
    public void sendServiceMsg(ServiceMessage serviceMessage, ServiceMessage serviceMessage2) throws DragonCommsException {
        serviceMessage.setMessageId(serviceMessage2.getMessageId());
        sendServiceMsg(serviceMessage);
    }

    @Override // dragon.network.comms.IComms
    public ServiceMessage receiveServiceMsg() throws InterruptedException {
        ServiceMessage take = this.incomingServiceQueue.take();
        log.debug("received service message [" + take.getType().name() + "]");
        return take;
    }

    @Override // dragon.network.comms.IComms
    public void sendNodeMsg(NodeDescriptor nodeDescriptor, NodeMessage nodeMessage) throws DragonCommsException {
        nodeMessage.setSender(this.me);
        int i = 0;
        while (i < this.conf.getDragonCommsRetryAttempts()) {
            try {
                log.debug("sending [" + nodeMessage.getType().name() + "] to [" + nodeDescriptor + "]");
                this.socketManager.getOutputStream("node", nodeDescriptor).writeObject(nodeMessage);
                this.socketManager.getOutputStream("node", nodeDescriptor).flush();
                this.socketManager.getOutputStream("node", nodeDescriptor).reset();
                return;
            } catch (IOException e) {
                i++;
                log.warn("could not connect to [" + nodeDescriptor + "]... will retry #[" + i + "] after [" + this.conf.getDragonCommsRetryMs() + "] ms");
                try {
                    Thread.sleep(this.conf.getDragonCommsRetryMs());
                } catch (InterruptedException e2) {
                    log.error("data was not transmitted");
                    return;
                }
            }
        }
        log.fatal("data can not be transmitted");
        throw new DragonCommsException("node data can not be transmitted");
    }

    @Override // dragon.network.comms.IComms
    public NodeMessage receiveNodeMsg() throws InterruptedException {
        return this.incomingNodeQueue.take();
    }

    @Override // dragon.network.comms.IComms
    public void sendNetworkTask(NodeDescriptor nodeDescriptor, NetworkTask networkTask) throws DragonCommsException {
        int i = 0;
        while (i < this.conf.getDragonCommsRetryAttempts()) {
            try {
                synchronized (this.socketManager.getOutputStream(RunLinks.SERIALIZED_NAME_TASK, nodeDescriptor)) {
                    networkTask.sendToStream(this.socketManager.getOutputStream(RunLinks.SERIALIZED_NAME_TASK, nodeDescriptor));
                    this.socketManager.getOutputStream(RunLinks.SERIALIZED_NAME_TASK, nodeDescriptor).flush();
                    this.resetCount++;
                    if (this.resetCount == this.resetMax) {
                        this.socketManager.getOutputStream(RunLinks.SERIALIZED_NAME_TASK, nodeDescriptor).reset();
                        this.resetCount = 0;
                    }
                }
                return;
            } catch (IOException e) {
                i++;
                log.warn("could not connect to [" + nodeDescriptor + "]... will retry #[" + i + "] after [" + this.conf.getDragonCommsRetryMs() + "] ms");
                try {
                    Thread.sleep(this.conf.getDragonCommsRetryMs());
                } catch (InterruptedException e2) {
                    log.error("data was not transmitted");
                    return;
                }
            }
        }
        log.fatal("data can not be transmitted");
        throw new DragonCommsException("task data can not be transmitted");
    }

    @Override // dragon.network.comms.IComms
    public NetworkTask receiveNetworkTask() throws InterruptedException {
        return this.incomingTaskQueue.take();
    }
}
