package csip;

import csip.annotations.Author;
import csip.annotations.Description;
import csip.annotations.Documentation;
import csip.annotations.License;
import csip.annotations.Name;
import csip.annotations.State;
import csip.annotations.VersionInfo;
import csip.utils.Services;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.codehaus.jettison.json.JSONObject;

@Path("p/{delegate:.*}")
@Name("Publish/Subscribe generic service.")
@VersionInfo("$Id: QueueingModelDataService.java f0585cefabc8 2019-01-22 od $")
@Author(name = "od", email = "<odavid@colostate.edu>", org = "CSU")
@Documentation("https://alm.engr.colostate.edu/csip")
@Description("Publish")
@License(License.GPL3)
@State(State.STABLE)
/* loaded from: input_file:csip/QueueingModelDataService.class */
public class QueueingModelDataService extends ModelDataService {

    @PathParam("delegate")
    String delegate;
    static String delegateUrl = Config.getString("ps.delegate.url");
    static Logger l = Config.LOG;
    private static QueueManagement mgmt = new QueueManagement();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement.class */
    public static class QueueManagement {
        final String STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
        final String STRING_DESER = "org.apache.kafka.common.serialization.StringDeserializer";
        String bootstrap_servers;
        long consumerPoll;
        String consumerGroupId;
        String queueTopic;
        String resultTopic;
        int capacity;
        long retryWait;
        long submitWait;
        Producer<String, String> qp;
        Consumer<String, String> qc;
        Consumer<String, String> rc;
        Consumer<String, String> receiveCo;
        Consumer<String, String> submitCo;
        ExecutorService e;
        FutureTask<String> submitTask;
        FutureTask<String> receiveTask;
        private final AtomicBoolean threadsRunning;
        private final AtomicLong submitOffset;

        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$ReceiveJobStatusThread.class */
        class ReceiveJobStatusThread implements Callable<String> {
            ReceiveJobStatusThread() {
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                Duration ofMillis = Duration.ofMillis(QueueManagement.this.consumerPoll);
                Client client = new Client(QueueingModelDataService.l);
                while (QueueManagement.this.threadsRunning.get()) {
                    try {
                        try {
                            ConsumerRecords poll = QueueManagement.this.receiveCo.poll(ofMillis);
                            for (TopicPartition topicPartition : poll.partitions()) {
                                List<ConsumerRecord> records = poll.records(topicPartition);
                                for (ConsumerRecord consumerRecord : records) {
                                    QueueingModelDataService.log(Level.INFO, consumerRecord.offset() + " RECEIVED: " + ((String) consumerRecord.key()) + " " + ((String) consumerRecord.value()));
                                    QueueManagement.this.queryResults((String) consumerRecord.key(), (String) consumerRecord.value(), client);
                                }
                                QueueManagement.this.receiveCo.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(((ConsumerRecord) records.get(records.size() - 1)).offset() + 1)));
                            }
                            QueueingModelDataService.log(Level.INFO, "Receive Alive.");
                        } catch (WakeupException e) {
                            if (QueueManagement.this.threadsRunning.get()) {
                                throw e;
                            }
                            QueueManagement.this.receiveCo.close();
                            client.close();
                            QueueingModelDataService.log(Level.INFO, "Receiver closed.");
                            return "Done Receive.";
                        }
                    } catch (Throwable th) {
                        QueueManagement.this.receiveCo.close();
                        client.close();
                        QueueingModelDataService.log(Level.INFO, "Receiver closed.");
                        throw th;
                    }
                }
                QueueManagement.this.receiveCo.close();
                client.close();
                QueueingModelDataService.log(Level.INFO, "Receiver closed.");
                return "Done Receive.";
            }
        }

        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$SubmitJobThread.class */
        class SubmitJobThread implements Callable<String> {
            SubmitJobThread() {
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                Duration ofMillis = Duration.ofMillis(QueueManagement.this.consumerPoll);
                Client client = new Client(QueueingModelDataService.l);
                while (QueueManagement.this.threadsRunning.get()) {
                    try {
                        try {
                            ConsumerRecords poll = QueueManagement.this.submitCo.poll(ofMillis);
                            for (TopicPartition topicPartition : poll.partitions()) {
                                List<ConsumerRecord> records = poll.records(topicPartition);
                                for (ConsumerRecord consumerRecord : records) {
                                    String str = (String) consumerRecord.key();
                                    String str2 = (String) consumerRecord.value();
                                    QueueingModelDataService.log(Level.INFO, "CONSUMER OFFSET: " + consumerRecord.offset());
                                    QueueingModelDataService.log(Level.INFO, consumerRecord.offset() + ": " + str + "  " + str2);
                                    try {
                                        Thread.sleep(QueueManagement.this.submitWait);
                                    } catch (InterruptedException e) {
                                        QueueingModelDataService.log(Level.INFO, "Interrupted");
                                    }
                                    String[] uRIParts = Services.getURIParts(str);
                                    String str3 = uRIParts[0] + uRIParts[1] + uRIParts[2] + uRIParts[3] + "/q/running";
                                    while (true) {
                                        QueueingModelDataService.log(Level.INFO, "Checking " + str3);
                                        String doGET = client.doGET(str3);
                                        QueueingModelDataService.log(Level.INFO, "Load: " + doGET + "/" + QueueManagement.this.capacity);
                                        if (doGET != null) {
                                            if (Integer.parseInt(doGET) > QueueManagement.this.capacity) {
                                                try {
                                                    QueueingModelDataService.log(Level.INFO, "Submit waiting because load is at capacity");
                                                    Thread.sleep(QueueManagement.this.retryWait);
                                                } catch (InterruptedException e2) {
                                                    QueueingModelDataService.log(Level.INFO, "Interrupted");
                                                }
                                            }
                                        }
                                    }
                                    QueueManagement.this.submitOffset.getAndSet(consumerRecord.offset());
                                    QueueManagement.this.executeAsync(str, str2, client);
                                }
                                QueueManagement.this.submitCo.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(((ConsumerRecord) records.get(records.size() - 1)).offset() + 1)));
                            }
                            QueueingModelDataService.log(Level.INFO, "Submit Alive.");
                        } catch (WakeupException e3) {
                            if (QueueManagement.this.threadsRunning.get()) {
                                throw e3;
                            }
                            QueueManagement.this.submitCo.close();
                            client.close();
                            QueueingModelDataService.log(Level.INFO, "Submitter closed.");
                            return "Done Submit.";
                        }
                    } catch (Throwable th) {
                        QueueManagement.this.submitCo.close();
                        client.close();
                        QueueingModelDataService.log(Level.INFO, "Submitter closed.");
                        throw th;
                    }
                }
                QueueManagement.this.submitCo.close();
                client.close();
                QueueingModelDataService.log(Level.INFO, "Submitter closed.");
                return "Done Submit.";
            }
        }

        private QueueManagement() {
            this.STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
            this.STRING_DESER = "org.apache.kafka.common.serialization.StringDeserializer";
            this.bootstrap_servers = Config.getString("ps.kafka.bootstrap_servers");
            this.consumerPoll = Config.getLong("ps.kafka.consumer.poll.ms", 10000L);
            this.consumerGroupId = Config.getString("ps.kafka.consumer.group.id", "test-consumer-group");
            this.queueTopic = Config.getString("ps.queue.topic", "queue_8080");
            this.resultTopic = Config.getString("ps.result.topic", "8086");
            this.capacity = Config.getInt("ps.submit.capacity", 4);
            this.retryWait = Config.getLong("ps.submit.retry.after.ms", 2000L);
            this.submitWait = Config.getLong("ps.submit.after.ms", 250L);
            this.receiveCo = getResultConsumer();
            this.submitCo = getQueueConsumer();
            this.e = Executors.newCachedThreadPool();
            this.submitTask = new FutureTask<>(new SubmitJobThread());
            this.receiveTask = new FutureTask<>(new ReceiveJobStatusThread());
            this.threadsRunning = new AtomicBoolean(true);
            this.submitOffset = new AtomicLong();
        }

        private Producer<String, String> getQueueProducer() {
            if (this.qp == null) {
                Properties properties = new Properties();
                properties.put("bootstrap.servers", this.bootstrap_servers);
                properties.put("acks", "1");
                properties.put("retries", 2);
                properties.put("max.block.ms", 1000);
                properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                this.qp = new KafkaProducer(properties);
            }
            return this.qp;
        }

        Properties getConsumerProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.bootstrap_servers);
            properties.put("enable.auto.commit", false);
            properties.put("group.id", this.consumerGroupId);
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return properties;
        }

        Consumer<String, String> getQueueConsumer() {
            if (this.qc == null) {
                this.qc = new KafkaConsumer(getConsumerProperties());
                this.qc.subscribe(Arrays.asList(this.queueTopic));
            }
            return this.qc;
        }

        Consumer<String, String> getResultConsumer() {
            if (this.rc == null) {
                this.rc = new KafkaConsumer(getConsumerProperties());
                this.rc.subscribe(Arrays.asList(this.resultTopic));
            }
            return this.rc;
        }

        synchronized RecordMetadata queue(String str, String str2) throws Exception {
            return (RecordMetadata) getQueueProducer().send(new ProducerRecord(this.queueTopic, str, str2)).get();
        }

        void executeAsync(String str, String str2, Client client) {
            try {
                JSONObject jSONObject = new JSONObject(str2);
                HashMap hashMap = new HashMap();
                hashMap.put(ModelDataService.KEY_SUUID, jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).getString(ModelDataService.KEY_SUUID));
                QueueingModelDataService.log(Level.INFO, "ASYNC POST " + str + "  " + jSONObject.toString(2));
                QueueingModelDataService.log(Level.INFO, "POST RESULT" + client.doPOST(str, jSONObject, hashMap).toString(2));
            } catch (Exception e) {
                QueueingModelDataService.log(Level.SEVERE, null, e);
            }
        }

        void queryResults(String str, String str2, Client client) {
            if (str.equalsIgnoreCase(ModelDataService.FINISHED)) {
                String[] split = str2.split("\\s+");
                try {
                    String[] uRIParts = Services.getURIParts(split[1]);
                    String doGET = client.doGET(uRIParts[0] + uRIParts[1] + uRIParts[2] + uRIParts[3] + "/q/" + split[0]);
                    QueueingModelDataService.log(Level.INFO, "RESULT: " + doGET);
                    JSONObject jSONObject = new JSONObject(doGET);
                    if (jSONObject.has(ModelDataService.KEY_METAINFO)) {
                        String optString = jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).optString("webhook");
                        if (!optString.isEmpty()) {
                            QueueingModelDataService.log(Level.INFO, "Webhook Post to " + optString);
                            String doPOST = client.doPOST(optString, doGET);
                            if (doPOST != null) {
                                QueueingModelDataService.log(Level.INFO, "Delivered and Acknowledged: " + doPOST);
                            }
                        }
                    }
                } catch (Exception e) {
                    QueueingModelDataService.log(Level.SEVERE, null, e);
                }
            }
        }

        void shutdown() {
            this.threadsRunning.set(false);
            this.receiveCo.wakeup();
            this.submitCo.wakeup();
            try {
                QueueingModelDataService.log(Level.INFO, this.submitTask.get());
                QueueingModelDataService.log(Level.INFO, this.receiveTask.get());
            } catch (InterruptedException | ExecutionException e) {
                QueueingModelDataService.log(Level.SEVERE, null, e);
            }
            if (this.qp != null) {
                this.qp.close();
            }
            this.e.shutdown();
        }

        void startup() {
            this.e.submit(this.submitTask);
            this.e.submit(this.receiveTask);
        }
    }

    @Override // csip.ModelDataService
    boolean isQueued() {
        return true;
    }

    @Override // csip.ModelDataService
    protected void doProcess() throws Exception {
        log(Level.INFO, getRequestContext());
        log(Level.INFO, getRequestURL());
        log(Level.INFO, getRequestHost());
        log(Level.INFO, this.delegate);
        if (delegateUrl == null) {
            delegateUrl = getRequestURL().substring(0, getRequestURL().indexOf(":"));
        }
        String str = delegateUrl + ":" + this.delegate;
        JSONObject jSONObject = new JSONObject(getRequest().toString());
        JSONObject jSONObject2 = jSONObject.getJSONObject(ModelDataService.KEY_METAINFO);
        jSONObject2.put(ModelDataService.KEY_MODE, ModelDataService.ASYNC);
        jSONObject2.remove(ModelDataService.KEY_CLOUD_NODE);
        jSONObject2.remove(ModelDataService.KEY_STATUS);
        jSONObject2.remove(ModelDataService.KEY_TSTAMP);
        jSONObject2.remove(ModelDataService.KEY_REQ_IP);
        jSONObject2.remove("queued_service");
        try {
            RecordMetadata queue = mgmt.queue(str, jSONObject.toString());
            long offset = queue.offset() - mgmt.submitOffset.get();
            log(Level.INFO, "PRODUCER OFFSET: " + queue.offset());
            getMetainfo().put("queue_len", offset);
            log(Level.INFO, "QUEUE LENGTH, " + offset);
        } catch (Exception e) {
            throw new ServiceException("Error queueing the service", e);
        }
    }

    static void log(Level level, String str, Throwable th) {
        l.log(level, str, th);
    }

    static void log(Level level, String str) {
        l.log(level, str);
    }

    public static void onContextInit() {
        try {
            mgmt.startup();
            log(Level.INFO, "Started Service Threads");
        } catch (Exception e) {
            log(Level.SEVERE, null, e);
        }
    }

    public static void onContextDestroy() {
        try {
            mgmt.shutdown();
            log(Level.INFO, "Stopped all Service Threads.");
        } catch (Exception e) {
            log(Level.SEVERE, null, e);
        }
    }
}
