package csip;

import csip.annotations.Author;
import csip.annotations.Description;
import csip.annotations.Documentation;
import csip.annotations.License;
import csip.annotations.Name;
import csip.annotations.State;
import csip.annotations.VersionInfo;
import csip.utils.Services;
import csip.utils.SimpleCache;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.codehaus.jettison.json.JSONObject;

@Path("p/pubsub/{delegate:.*}")
@Name("pubsub")
@VersionInfo("$Id: QueueingModelDataService.java 1eec6b5c01e3 2019-04-16 od $")
@Author(name = "od", email = "<odavid@colostate.edu>", org = "CSU")
@Documentation("https://alm.engr.colostate.edu/csip")
@Description("Publish/Subscribe")
@License(License.MIT)
@State(State.STABLE)
/* loaded from: input_file:csip/QueueingModelDataService.class */
public class QueueingModelDataService extends ModelDataService {
    static final String KEY_WEBHOOK = "webhook";
    static final String KEY_QUEUE_POS = "queue_pos";
    static String delegateUrl = Config.getString("csip.pubsub.delegate.url");
    static boolean needsWebHook = Config.getBoolean("csip.pubsub.webhook.payload", true);
    static Logger l = Config.LOG;
    private static QueueManagement mgmt = new QueueManagement();

    @PathParam("delegate")
    String delegate;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement.class */
    public static class QueueManagement {
        final String STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
        final String STRING_DESER = "org.apache.kafka.common.serialization.StringDeserializer";
        String bootstrap_servers;
        long consumerPoll;
        String consumerGroupId;
        String queueTopic;
        String resultTopic;
        long submitDelay;
        int defaultCapacity;
        int pingTimeout;
        long maxInQueue;
        long capacityDelay;
        Producer<String, String> queryProducer;
        Consumer<String, String> receiveConsumer;
        Consumer<String, String> submitConsumer;
        ExecutorService executor;
        FutureTask<String> submitTask;
        FutureTask<String> receiveTask;
        final AtomicBoolean threadsRunning;
        final AtomicLong submitOffset;
        SimpleCache<String, Integer> capacities;
        long queueOffset;

        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$ReceiveJobStatusThread.class */
        class ReceiveJobStatusThread implements Callable<String> {
            ReceiveJobStatusThread() {
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                Duration ofMillis = Duration.ofMillis(QueueManagement.this.consumerPoll);
                Client client = new Client(QueueingModelDataService.l);
                while (QueueManagement.this.threadsRunning.get()) {
                    try {
                        try {
                            ConsumerRecords poll = QueueManagement.this.receiveConsumer.poll(ofMillis);
                            for (TopicPartition topicPartition : poll.partitions()) {
                                List<ConsumerRecord> records = poll.records(topicPartition);
                                for (ConsumerRecord consumerRecord : records) {
                                    QueueingModelDataService.l.log(Level.INFO, "{0} RECEIVED: {1} {2}", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                                    QueueManagement.this.queryResults((String) consumerRecord.key(), (String) consumerRecord.value(), client);
                                }
                                QueueManagement.this.receiveConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(((ConsumerRecord) records.get(records.size() - 1)).offset() + 1)));
                            }
                            QueueingModelDataService.l.log(Level.INFO, "Receive Alive.");
                        } catch (WakeupException e) {
                            if (QueueManagement.this.threadsRunning.get()) {
                                throw e;
                            }
                            QueueManagement.this.receiveConsumer.close();
                            client.close();
                            QueueingModelDataService.l.log(Level.INFO, "Receiver closed.");
                            return "Done Receive.";
                        }
                    } catch (Throwable th) {
                        QueueManagement.this.receiveConsumer.close();
                        client.close();
                        QueueingModelDataService.l.log(Level.INFO, "Receiver closed.");
                        throw th;
                    }
                }
                QueueManagement.this.receiveConsumer.close();
                client.close();
                QueueingModelDataService.l.log(Level.INFO, "Receiver closed.");
                return "Done Receive.";
            }
        }

        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$SubmitJobThread.class */
        class SubmitJobThread implements Callable<String> {
            SubmitJobThread() {
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                Duration ofMillis = Duration.ofMillis(QueueManagement.this.consumerPoll);
                Client client = new Client(QueueingModelDataService.l);
                long j = QueueManagement.this.submitDelay;
                while (QueueManagement.this.threadsRunning.get()) {
                    try {
                        try {
                            ConsumerRecords poll = QueueManagement.this.submitConsumer.poll(ofMillis);
                            for (TopicPartition topicPartition : poll.partitions()) {
                                List<ConsumerRecord> records = poll.records(topicPartition);
                                for (ConsumerRecord consumerRecord : records) {
                                    String str = (String) consumerRecord.key();
                                    String str2 = (String) consumerRecord.value();
                                    QueueingModelDataService.l.log(Level.INFO, "{0}: {1}  {2}", new Object[]{Long.valueOf(consumerRecord.offset()), str, str2});
                                    try {
                                        Thread.sleep(j);
                                    } catch (InterruptedException e) {
                                        QueueingModelDataService.l.log(Level.INFO, "Interrupted");
                                    }
                                    if (Client.ping(str, QueueManagement.this.pingTimeout) == -1) {
                                        QueueManagement.this.queue(str, str2);
                                        j = QueueManagement.this.capacityDelay;
                                        QueueingModelDataService.l.log(Level.INFO, "Cannot ping the service, back in line...");
                                    } else {
                                        String[] uRIParts = Services.getURIParts(str);
                                        String doGET = client.doGET(uRIParts[0] + uRIParts[1] + uRIParts[2] + "/" + uRIParts[3] + "/q/running");
                                        int parseInt = Integer.parseInt(doGET);
                                        int contextCapacity = QueueManagement.this.getContextCapacity(uRIParts[3]);
                                        QueueingModelDataService.l.log(Level.INFO, "Load for {3}: {0}/{1}", new Object[]{doGET, Integer.valueOf(contextCapacity), uRIParts[3]});
                                        if (parseInt >= contextCapacity) {
                                            QueueManagement.this.queue(str, str2);
                                            j = QueueManagement.this.capacityDelay;
                                            QueueingModelDataService.l.log(Level.INFO, "back in line...");
                                        } else {
                                            QueueManagement.this.submitOffset.getAndSet(consumerRecord.offset());
                                            QueueManagement.this.executeAsync(str, str2, doGET, client);
                                            j = QueueManagement.this.submitDelay;
                                        }
                                    }
                                }
                                QueueManagement.this.submitConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(((ConsumerRecord) records.get(records.size() - 1)).offset() + 1)));
                            }
                            QueueingModelDataService.l.log(Level.INFO, "Submit Alive.");
                        } catch (WakeupException e2) {
                            if (QueueManagement.this.threadsRunning.get()) {
                                throw e2;
                            }
                            QueueManagement.this.submitConsumer.close();
                            client.close();
                            QueueingModelDataService.l.log(Level.INFO, "Submitter closed.");
                            return "Done Submit.";
                        }
                    } catch (Throwable th) {
                        QueueManagement.this.submitConsumer.close();
                        client.close();
                        QueueingModelDataService.l.log(Level.INFO, "Submitter closed.");
                        throw th;
                    }
                }
                QueueManagement.this.submitConsumer.close();
                client.close();
                QueueingModelDataService.l.log(Level.INFO, "Submitter closed.");
                return "Done Submit.";
            }
        }

        private QueueManagement() {
            this.STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
            this.STRING_DESER = "org.apache.kafka.common.serialization.StringDeserializer";
            this.bootstrap_servers = Config.getString("csip.pubsub.kafka.bootstrap_servers");
            this.consumerPoll = Config.getLong("csip.pubsub.kafka.consumer.poll.ms", 10000L);
            this.consumerGroupId = Config.getString("csip.pubsub.kafka.consumer.group.id", "test-consumer-group");
            this.queueTopic = Config.getString("csip.pubsub.queue.topic", "queue_8080");
            this.resultTopic = Config.getString("csip.pubsub.result.topic", "8086");
            this.submitDelay = Config.getLong("csip.pubsub.submit.delay.ms", 1000L);
            this.defaultCapacity = Config.getInt("csip.pubsub.default.capacity", 8);
            this.pingTimeout = Config.getInt("csip.pubsub.ping.timeout.ms", 1000);
            this.maxInQueue = Config.getLong("csip.pubsub.maxqueue.count", -1L);
            this.capacityDelay = Config.getLong("csip.pubsub.atcapacity.delay.ms", 2000L);
            this.receiveConsumer = getResultConsumer();
            this.submitConsumer = getQueueConsumer();
            this.executor = Executors.newCachedThreadPool();
            this.submitTask = new FutureTask<>(new SubmitJobThread());
            this.receiveTask = new FutureTask<>(new ReceiveJobStatusThread());
            this.threadsRunning = new AtomicBoolean(true);
            this.submitOffset = new AtomicLong(-1L);
            this.capacities = new SimpleCache<>();
            this.queueOffset = -1L;
        }

        public long getQueueLen() {
            long j = QueueingModelDataService.mgmt.submitOffset.get();
            if (j != 1) {
                return this.queueOffset - j;
            }
            return -1L;
        }

        Producer<String, String> getQueueProducer() {
            if (this.queryProducer == null) {
                Properties properties = new Properties();
                properties.put("bootstrap.servers", this.bootstrap_servers);
                properties.put("acks", "1");
                properties.put("retries", 2);
                properties.put("max.block.ms", 1000);
                properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                this.queryProducer = new KafkaProducer(properties);
            }
            return this.queryProducer;
        }

        Properties getConsumerProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.bootstrap_servers);
            properties.put("enable.auto.commit", false);
            properties.put("group.id", this.consumerGroupId);
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return properties;
        }

        Consumer<String, String> getQueueConsumer() {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(getConsumerProperties());
            kafkaConsumer.subscribe(Arrays.asList(this.queueTopic));
            return kafkaConsumer;
        }

        Consumer<String, String> getResultConsumer() {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(getConsumerProperties());
            kafkaConsumer.subscribe(Arrays.asList(this.resultTopic));
            return kafkaConsumer;
        }

        synchronized long queue(String str, String str2) throws Exception {
            this.queueOffset = ((RecordMetadata) getQueueProducer().send(new ProducerRecord(this.queueTopic, str, str2)).get()).offset();
            return getQueueLen();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getContextCapacity(String str) {
            return this.capacities.get(str, str2 -> {
                return Integer.valueOf(Config.getInt("csip.pubsub." + str2 + " .capacity", this.defaultCapacity));
            }).intValue();
        }

        void executeAsync(String str, String str2, String str3, Client client) {
            try {
                JSONObject jSONObject = new JSONObject(str2);
                HashMap hashMap = new HashMap();
                hashMap.put(ModelDataService.KEY_SUUID, jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).getString(ModelDataService.KEY_SUUID));
                jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).put("cap", str3);
                JSONObject doPOST = client.doPOST(str, jSONObject, hashMap);
                if (QueueingModelDataService.l.isLoggable(Level.FINE)) {
                    QueueingModelDataService.l.log(Level.FINE, "POST Run to " + str + " ... received: " + doPOST.toString());
                }
            } catch (Exception e) {
                QueueingModelDataService.l.log(Level.SEVERE, (String) null, (Throwable) e);
            }
        }

        void queryResults(String str, String str2, Client client) {
            if (str.equals(ModelDataService.FINISHED) || str.equals(ModelDataService.FAILED) || str.equals(ModelDataService.CANCELED) || str.equals(ModelDataService.TIMEDOUT)) {
                String[] split = str2.split("\\s+");
                try {
                    String[] uRIParts = Services.getURIParts(split[1]);
                    String str3 = uRIParts[0] + uRIParts[1] + uRIParts[2] + "/" + uRIParts[3] + "/q/" + split[0];
                    String doGET = client.doGET(str3);
                    JSONObject jSONObject = new JSONObject(doGET);
                    if (QueueingModelDataService.l.isLoggable(Level.FINE)) {
                        QueueingModelDataService.l.log(Level.FINE, "Received RESULT for:  " + str3 + " " + doGET);
                    }
                    if (jSONObject.has(ModelDataService.KEY_METAINFO)) {
                        String optString = jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).optString(QueueingModelDataService.KEY_WEBHOOK);
                        if (!optString.isEmpty()) {
                            QueueingModelDataService.l.log(Level.INFO, "Webhook Post to " + optString);
                            String doPOST = client.doPOST(optString, doGET);
                            if (doPOST != null) {
                                QueueingModelDataService.l.log(Level.INFO, "Delivered and Acknowledged: " + doPOST);
                            }
                        }
                    }
                } catch (Exception e) {
                    QueueingModelDataService.l.log(Level.SEVERE, (String) null, (Throwable) e);
                }
            }
        }

        void shutdown() {
            this.threadsRunning.set(false);
            this.receiveConsumer.wakeup();
            this.submitConsumer.wakeup();
            try {
                QueueingModelDataService.l.log(Level.INFO, this.submitTask.get());
                QueueingModelDataService.l.log(Level.INFO, this.receiveTask.get());
            } catch (InterruptedException | ExecutionException e) {
                QueueingModelDataService.l.log(Level.SEVERE, (String) null, e);
            }
            if (this.queryProducer != null) {
                this.queryProducer.close();
            }
            this.executor.shutdown();
        }

        void startup() {
            this.executor.submit(this.submitTask);
            this.executor.submit(this.receiveTask);
        }
    }

    @Override // csip.ModelDataService
    boolean isQueued() {
        return true;
    }

    @Override // csip.ModelDataService
    protected void doProcess() throws Exception {
        if (l.isLoggable(Level.INFO)) {
            l.log(Level.INFO, this.delegate);
        }
        long queueLen = mgmt.getQueueLen();
        if (mgmt.maxInQueue > -1 && queueLen > -1 && queueLen >= mgmt.maxInQueue) {
            throw new ServiceException("Queue capacity of " + mgmt.maxInQueue + " reached, try again later.");
        }
        if (needsWebHook && !metainfo().hasName(KEY_WEBHOOK)) {
            throw new ServiceException("'webhook' metainfo missing.");
        }
        if (delegateUrl == null) {
            String url = request().getURL();
            delegateUrl = url.substring(0, url.indexOf(":"));
        }
        String str = delegateUrl + ":" + this.delegate;
        JSONObject jSONObject = new JSONObject(getRequest().toString());
        JSONObject jSONObject2 = jSONObject.getJSONObject(ModelDataService.KEY_METAINFO);
        jSONObject2.put(ModelDataService.KEY_MODE, ModelDataService.ASYNC);
        jSONObject2.remove(ModelDataService.KEY_CLOUD_NODE);
        jSONObject2.remove(ModelDataService.KEY_STATUS);
        jSONObject2.remove(ModelDataService.KEY_TSTAMP);
        jSONObject2.remove(ModelDataService.KEY_REQ_IP);
        try {
            if (Client.ping(str, mgmt.pingTimeout) == -1) {
                throw new ServiceException("Target service not available: " + str);
            }
            long queue = mgmt.queue(str, jSONObject.toString());
            getMetainfo().put(KEY_QUEUE_POS, queue);
            if (l.isLoggable(Level.INFO)) {
                l.log(Level.INFO, "QUEUE POS, " + queue);
            }
        } catch (Exception e) {
            throw new ServiceException("Error queueing the service", e);
        }
    }

    public static void onContextInit() {
        try {
            mgmt.startup();
            l.log(Level.INFO, "Started Pub/Sub Threads.");
        } catch (Exception e) {
            l.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }

    public static void onContextDestroy() {
        try {
            mgmt.shutdown();
            l.log(Level.INFO, "Stopped Pub/Sub Threads.");
        } catch (Exception e) {
            l.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }
}
