package csip;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import csip.annotations.Author;
import csip.annotations.Description;
import csip.annotations.Documentation;
import csip.annotations.License;
import csip.annotations.Name;
import csip.annotations.State;
import csip.annotations.VersionInfo;
import csip.utils.Services;
import csip.utils.SimpleCache;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.bson.Document;
import org.codehaus.jettison.json.JSONObject;

@Path("p/pubsub/{delegate:.*}")
@Name("pubsub")
@VersionInfo("$Id: QueueingModelDataService.java cb35de09fc57 2019-06-03 od $")
@Author(name = "od", email = "<odavid@colostate.edu>", org = "CSU")
@Documentation("https://alm.engr.colostate.edu/csip")
@Description("Publish/Subscribe")
@License(License.MIT)
@State(State.STABLE)
/* loaded from: input_file:csip/QueueingModelDataService.class */
public class QueueingModelDataService extends ModelDataService {
    static final String KEY_QUEUE_POS = "queue_pos";
    static String delegateUrl = Config.getString("csip.pubsub.delegate.url");
    static boolean needsWebHook = Config.getBoolean("csip.pubsub.webhook.payload", true);
    static int queueLen = Config.getInt("csip.pubsub.queue.len", Integer.MAX_VALUE);
    static int queueRemainingLen = Config.getInt("csip.pubsub.queue.remaining.len", 25);
    static Logger l = Config.LOG;
    private static final QueueManagement mgmt = new QueueManagement();

    @PathParam("delegate")
    String delegate;
    boolean isQueued = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement.class */
    public static class QueueManagement {
        final String STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
        final String STRING_DESER = "org.apache.kafka.common.serialization.StringDeserializer";
        String bootstrap_servers;
        long consumerPoll;
        String consumerGroupId;
        String queueTopic;
        String resultTopic;
        long submitDelay;
        int defaultCapacity;
        int pingTimeout;
        long delayAtCapacilty;
        boolean checkTarget;
        long offerMS;
        long pollMS;
        long loadcheck;
        String connect;
        Consumer<String, String> receiveConsumer;
        ExecutorService executor;
        ScheduledExecutorService ses;
        LoadProbe probe;
        FutureTask<String> submitTask;
        FutureTask<String> receiveTask;
        FutureTask<String> deliveryTask;
        final AtomicBoolean threadsRunning;
        SimpleCache<String, Integer> capacities;
        AtomicInteger incoming;
        AtomicInteger queued_sub;
        AtomicInteger queued_rec;
        AtomicInteger queued_back;
        AtomicInteger exec_sub;
        AtomicInteger exec_rec;
        AtomicInteger webhook_sub;
        AtomicInteger webhook_sub_failed;
        AtomicInteger webhook_rec;
        AtomicInteger sn;
        BlockingQueue<Payload> queue;
        BlockingQueue<String> deliveryQueue;
        final AtomicBoolean queueOpen;
        Stats stats;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$LoadProbe.class */
        public class LoadProbe implements Runnable {
            Client cl = new Client(QueueingModelDataService.l);
            Map<String, Integer> v = new ConcurrentHashMap();
            Map<String, String> sh = new HashMap();

            LoadProbe() {
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (QueueManagement.this.threadsRunning.get()) {
                        update();
                    }
                } catch (Exception e) {
                    QueueingModelDataService.l.log(Level.SEVERE, (String) null, (Throwable) e);
                }
            }

            void close() {
                this.cl.close();
            }

            private void update() throws Exception {
                QueueingModelDataService.l.info("Backend update.");
                for (Map.Entry<String, Integer> entry : this.v.entrySet()) {
                    Integer query = query(entry.getKey());
                    entry.setValue(query);
                    QueueingModelDataService.l.info("updated: " + entry.getKey() + " -> " + query);
                }
            }

            private Integer query(String str) throws Exception {
                return Integer.valueOf(this.cl.doGET(str + "/q/running"));
            }

            int getCurrentLoad(String str) throws Exception {
                String context = getContext(str);
                Integer num = this.v.get(context);
                if (num == null) {
                    Map<String, Integer> map = this.v;
                    Integer query = query(context);
                    num = query;
                    map.put(context, query);
                }
                return num.intValue();
            }

            String getContext(String str) throws URISyntaxException {
                String str2 = this.sh.get(str);
                if (str2 == null) {
                    String[] uRIParts = Services.getURIParts(str);
                    Map<String, String> map = this.sh;
                    String str3 = uRIParts[0] + uRIParts[1] + uRIParts[2] + "/" + uRIParts[3];
                    str2 = str3;
                    map.put(str, str3);
                }
                return str2;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$Payload.class */
        public static class Payload {
            String url;
            String request;

            Payload(String str, String str2) {
                this.url = str;
                this.request = str2;
            }
        }

        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$PublishJobThread.class */
        class PublishJobThread implements Callable<String> {
            Client cl = new Client(QueueingModelDataService.l);

            PublishJobThread() {
            }

            private void publish(Client client, String str) throws Exception {
                JSONObject jSONObject = new JSONObject(str);
                if (!jSONObject.has(ModelDataService.KEY_METAINFO)) {
                    QueueManagement.this.webhook_sub_failed.incrementAndGet();
                    QueueingModelDataService.l.log(Level.SEVERE, "PublishError for :" + str);
                    return;
                }
                String optString = jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).optString(ModelDataService.KEY_WEBHOOK);
                if (optString.isEmpty()) {
                    return;
                }
                QueueingModelDataService.l.log(Level.INFO, "Webhook Post to " + optString);
                QueueManagement.this.webhook_sub.incrementAndGet();
                String doPOST = client.doPOST(optString, str);
                if (doPOST != null) {
                    QueueingModelDataService.l.log(Level.INFO, "Delivered and Acknowledged: " + doPOST);
                    QueueManagement.this.webhook_rec.incrementAndGet();
                }
                if (QueueingModelDataService.mgmt.stats != null) {
                    QueueingModelDataService.mgmt.stats.inc(QueueingModelDataService.l, jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).optString(ModelDataService.KEY_SERVICE_URL), jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).optLong(ModelDataService.KEY_CPU_TIME), jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).optString("csip-auth"));
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                while (QueueManagement.this.threadsRunning.get()) {
                    try {
                        String poll = QueueManagement.this.deliveryQueue.poll(QueueManagement.this.pollMS, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            QueueingModelDataService.l.log(Level.INFO, "RECEIVED FOR PUBLISH: {0} ", new Object[]{poll});
                            publish(this.cl, poll);
                        }
                        QueueingModelDataService.l.log(Level.INFO, "Publish Alive.");
                    } finally {
                        this.cl.close();
                        QueueingModelDataService.l.log(Level.INFO, "Publisher closed.");
                    }
                }
                return "Done Publish.";
            }
        }

        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$ReceiveJobStatusThread.class */
        class ReceiveJobStatusThread implements Callable<String> {
            Duration d;
            Client cl = new Client(QueueingModelDataService.l);

            ReceiveJobStatusThread() {
                this.d = Duration.ofMillis(QueueManagement.this.consumerPoll);
            }

            private void queryResults(String str, String str2, Client client) {
                if (str.equals(ModelDataService.FINISHED) || str.equals(ModelDataService.FAILED) || str.equals(ModelDataService.CANCELED) || str.equals(ModelDataService.TIMEDOUT)) {
                    String[] split = str2.split("\\s+");
                    QueueManagement.this.exec_rec.incrementAndGet();
                    try {
                        String[] uRIParts = Services.getURIParts(split[1]);
                        String str3 = uRIParts[0] + uRIParts[1] + uRIParts[2] + "/" + uRIParts[3] + "/q/" + split[0];
                        QueueingModelDataService.l.log(Level.INFO, "Query Results " + str3);
                        String doGET = client.doGET(str3);
                        if (QueueingModelDataService.l.isLoggable(Level.FINE)) {
                            QueueingModelDataService.l.log(Level.FINE, "Received RESULT for:  " + str3 + " " + doGET);
                        }
                        QueueManagement.this.deliveryQueue.put(doGET);
                    } catch (Exception e) {
                        QueueingModelDataService.l.log(Level.SEVERE, (String) null, (Throwable) e);
                    }
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                while (QueueManagement.this.threadsRunning.get()) {
                    try {
                        ConsumerRecords poll = QueueManagement.this.receiveConsumer.poll(this.d);
                        if (poll.count() > 0) {
                            poll.forEach(consumerRecord -> {
                                QueueingModelDataService.l.log(Level.INFO, "{0} RECEIVED: {1} {2}", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                                queryResults((String) consumerRecord.key(), (String) consumerRecord.value(), this.cl);
                            });
                            QueueManagement.this.receiveConsumer.commitSync();
                        }
                        QueueingModelDataService.l.log(Level.INFO, "Receive Alive.");
                    } catch (WakeupException e) {
                        if (QueueManagement.this.threadsRunning.get()) {
                            throw e;
                        }
                        return "Done Receive.";
                    } finally {
                        QueueManagement.this.receiveConsumer.close();
                        this.cl.close();
                        QueueingModelDataService.l.log(Level.INFO, "Receiver closed.");
                    }
                }
                return "Done Receive.";
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$Stats.class */
        public static class Stats {
            MongoClient mongo;
            MongoDatabase db;
            UpdateOptions opt = new UpdateOptions().upsert(true);
            static final Document INC = new Document("$inc", new Document("count", 1L));

            Stats(String str) {
                MongoClientURI mongoClientURI = new MongoClientURI(str);
                String database = mongoClientURI.getDatabase();
                database = database == null ? "pubsub" : database;
                this.mongo = new MongoClient(mongoClientURI);
                this.db = this.mongo.getDatabase(database);
            }

            void inc(Logger logger, String str, long j, String str2) {
                if (str2.isEmpty()) {
                    logger.warning("No auth/collection for  " + str);
                    return;
                }
                MongoCollection collection = this.db.getCollection(str2);
                if (collection.updateOne(Filters.eq("service", str), INC).getModifiedCount() == 0) {
                    collection.insertOne(new Document("service", str));
                    collection.updateOne(Filters.eq("service", str), INC);
                }
                collection.updateOne(Filters.eq("service", str), new Document("$inc", new Document("time", Long.valueOf(j))));
            }

            void close() {
                this.mongo.close();
            }
        }

        /* loaded from: input_file:csip/QueueingModelDataService$QueueManagement$SubmitJobThread.class */
        class SubmitJobThread implements Callable<String> {
            Client cl = new Client(QueueingModelDataService.l);
            long delay;

            SubmitJobThread() {
                this.delay = QueueManagement.this.submitDelay;
            }

            private void executeAsync(String str, String str2, int i, Client client) {
                try {
                    JSONObject jSONObject = new JSONObject(str2);
                    HashMap hashMap = new HashMap();
                    hashMap.put(ModelDataService.KEY_SUUID, jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).getString(ModelDataService.KEY_SUUID));
                    jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).put("cap", i);
                    jSONObject.getJSONObject(ModelDataService.KEY_METAINFO).put("sn", QueueManagement.this.sn.get());
                    QueueManagement.this.sn.incrementAndGet();
                    JSONObject doPOST = client.doPOST(str, jSONObject, hashMap);
                    if (QueueingModelDataService.l.isLoggable(Level.FINE)) {
                        QueueingModelDataService.l.log(Level.FINE, "POST Run to " + str + " ... received: " + doPOST.toString());
                    }
                    QueueManagement.this.exec_sub.incrementAndGet();
                } catch (Exception e) {
                    QueueingModelDataService.l.log(Level.SEVERE, (String) null, (Throwable) e);
                }
            }

            private void submit(Client client, String str, String str2) throws Exception {
                try {
                    Thread.sleep(this.delay);
                } catch (InterruptedException e) {
                    QueueingModelDataService.l.log(Level.INFO, "Interrupted");
                }
                if (QueueManagement.this.checkTarget && Client.ping(str, QueueManagement.this.pingTimeout) == -1) {
                    QueueManagement.this.queue(str, str2);
                    this.delay = QueueManagement.this.delayAtCapacilty;
                    QueueingModelDataService.l.log(Level.INFO, "Cannot ping the service, back in line...");
                    return;
                }
                int currentLoad = QueueManagement.this.probe.getCurrentLoad(str);
                int contextCapacity = QueueManagement.this.getContextCapacity(QueueManagement.this.probe.getContext(str));
                QueueingModelDataService.l.log(Level.INFO, "Load for {2}: {0}/{1}", new Object[]{Integer.valueOf(currentLoad), Integer.valueOf(contextCapacity), str});
                if (currentLoad < contextCapacity) {
                    QueueManagement.this.queued_rec.incrementAndGet();
                    executeAsync(str, str2, currentLoad, client);
                    this.delay = QueueManagement.this.submitDelay;
                } else {
                    QueueManagement.this.queue(str, str2);
                    QueueManagement.this.queued_back.incrementAndGet();
                    this.delay = QueueManagement.this.delayAtCapacilty;
                    QueueingModelDataService.l.log(Level.WARNING, "back in line...{0}, {1}/{2}", new Object[]{str, Integer.valueOf(currentLoad), Integer.valueOf(contextCapacity)});
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                while (QueueManagement.this.threadsRunning.get()) {
                    try {
                        Payload poll = QueueManagement.this.queue.poll(QueueManagement.this.pollMS, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            QueueingModelDataService.l.log(Level.INFO, "RECEIVED: {0} ", new Object[]{poll.url});
                            QueueingModelDataService.l.log(Level.FINE, "  Request: {0}", new Object[]{poll.request});
                            submit(this.cl, poll.url, poll.request);
                        }
                        QueueingModelDataService.l.log(Level.INFO, "Submit Alive.");
                    } finally {
                        this.cl.close();
                        QueueingModelDataService.l.log(Level.INFO, "Submitter closed.");
                    }
                }
                return "Done Submit.";
            }
        }

        private QueueManagement() {
            this.STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
            this.STRING_DESER = "org.apache.kafka.common.serialization.StringDeserializer";
            this.bootstrap_servers = Config.getString("csip.pubsub.kafka.bootstrap_servers");
            this.consumerPoll = Config.getLong("csip.pubsub.kafka.consumer.poll.ms", 10000L);
            this.consumerGroupId = Config.getString("csip.pubsub.kafka.consumer.group.id", "test-consumer-group");
            this.queueTopic = Config.getString("csip.pubsub.queue.topic", "queue_8080");
            this.resultTopic = Config.getString("csip.pubsub.result.topic", "8086");
            this.submitDelay = Config.getLong("csip.pubsub.submit.delay.ms", 1000L);
            this.defaultCapacity = Config.getInt("csip.pubsub.default.capacity", 8);
            this.pingTimeout = Config.getInt("csip.pubsub.ping.timeout.ms", 1000);
            this.delayAtCapacilty = Config.getLong("csip.pubsub.atcapacity.delay.ms", 2000L);
            this.checkTarget = Config.getBoolean("csip.pubsub.check.target", false);
            this.offerMS = Config.getLong("csip.pubsub.offer.ms", 500L);
            this.pollMS = Config.getLong("csip.pubsub.poll.ms", 2000L);
            this.loadcheck = Config.getLong("csip.pubsub.loadcheck.ms", 2000L);
            this.connect = Config.getString("csip.pubsub.stats", null);
            this.receiveConsumer = getResultConsumer();
            this.executor = Executors.newCachedThreadPool();
            this.ses = Executors.newSingleThreadScheduledExecutor();
            this.probe = new LoadProbe();
            this.submitTask = new FutureTask<>(new SubmitJobThread());
            this.receiveTask = new FutureTask<>(new ReceiveJobStatusThread());
            this.deliveryTask = new FutureTask<>(new PublishJobThread());
            this.threadsRunning = new AtomicBoolean(true);
            this.capacities = new SimpleCache<>();
            this.incoming = new AtomicInteger(0);
            this.queued_sub = new AtomicInteger(0);
            this.queued_rec = new AtomicInteger(0);
            this.queued_back = new AtomicInteger(0);
            this.exec_sub = new AtomicInteger(0);
            this.exec_rec = new AtomicInteger(0);
            this.webhook_sub = new AtomicInteger(0);
            this.webhook_sub_failed = new AtomicInteger(0);
            this.webhook_rec = new AtomicInteger(0);
            this.sn = new AtomicInteger(0);
            this.queue = new LinkedBlockingQueue(QueueingModelDataService.queueLen);
            this.deliveryQueue = new LinkedBlockingQueue();
            this.queueOpen = new AtomicBoolean(true);
        }

        public int getQueueLen() {
            return this.queue.size();
        }

        public int getRemainingCapacity() {
            return this.queue.remainingCapacity();
        }

        Properties getConsumerProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.bootstrap_servers);
            properties.put("enable.auto.commit", false);
            properties.put("group.id", this.consumerGroupId);
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return properties;
        }

        Consumer<String, String> getResultConsumer() {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(getConsumerProperties());
            kafkaConsumer.subscribe(Arrays.asList(this.resultTopic));
            return kafkaConsumer;
        }

        synchronized int queue(String str, String str2) throws Exception {
            if (QueueingModelDataService.l.isLoggable(Level.INFO)) {
                QueueingModelDataService.l.log(Level.INFO, "Queueing  :{0} {1}", new Object[]{str, str2});
            }
            if (!this.queue.offer(new Payload(str, str2), this.offerMS, TimeUnit.MILLISECONDS)) {
                return -1;
            }
            this.queued_sub.getAndIncrement();
            return getQueueLen();
        }

        int getContextCapacity(String str) {
            return this.capacities.get(str, str2 -> {
                return Integer.valueOf(Config.getInt("csip.pubsub." + str2.replace('/', '.').replace(':', '.') + ".capacity", this.defaultCapacity));
            }).intValue();
        }

        void shutdown() {
            this.threadsRunning.set(false);
            this.receiveConsumer.wakeup();
            try {
                QueueingModelDataService.l.log(Level.INFO, this.receiveTask.get());
                QueueingModelDataService.l.log(Level.INFO, this.submitTask.get());
                QueueingModelDataService.l.log(Level.INFO, this.deliveryTask.get());
            } catch (InterruptedException | ExecutionException e) {
                QueueingModelDataService.l.log(Level.SEVERE, (String) null, e);
            }
            this.executor.shutdown();
            if (this.connect != null) {
                this.stats.close();
            }
            this.ses.shutdown();
            this.probe.close();
        }

        void startup() {
            this.executor.submit(this.submitTask);
            this.executor.submit(this.receiveTask);
            this.executor.submit(this.deliveryTask);
            this.ses.scheduleWithFixedDelay(this.probe, 2000L, this.loadcheck, TimeUnit.MILLISECONDS);
            if (this.connect != null) {
                this.stats = new Stats(this.connect);
            }
        }
    }

    @Override // csip.ModelDataService
    boolean isQueued() {
        return this.isQueued;
    }

    @Override // csip.ModelDataService
    protected void doProcess() throws Exception {
        if (this.delegate == null || this.delegate.isEmpty()) {
            throw new ServiceException("No delegate service provided.");
        }
        if (this.delegate.equals("queue")) {
            results().put(KEY_QUEUE_POS, mgmt.getQueueLen());
            results().put("queue_remaining", mgmt.getRemainingCapacity());
            results().put("queue_min_remaining", queueRemainingLen);
            results().put("publish_queue", mgmt.deliveryQueue.size());
            return;
        }
        if (this.delegate.equals(ModelDataService.KEY_STATUS)) {
            results().put(KEY_QUEUE_POS, mgmt.getQueueLen());
            results().put("queue_remaining", mgmt.getRemainingCapacity());
            results().put("incoming", mgmt.incoming.get());
            results().put("queued_sub", mgmt.queued_sub.get());
            results().put("queued_rec", mgmt.queued_rec.get());
            results().put("queued_back", mgmt.queued_back.get());
            results().put("exec_sub", mgmt.exec_sub.get());
            results().put("exec_rec", mgmt.exec_rec.get());
            results().put("webhook_sub", mgmt.webhook_sub.get());
            results().put("webhook_sub_failed", mgmt.webhook_sub_failed.get());
            results().put("webhook_rec", mgmt.webhook_rec.get());
            results().put("openQueue", mgmt.queueOpen.get());
            return;
        }
        if (this.delegate.equals("reset")) {
            mgmt.queue.clear();
            mgmt.sn.set(0);
            results().put(ModelDataService.OK, true);
            return;
        }
        if (this.delegate.equals("queue_stats")) {
            int i = 0;
            for (QueueManagement.Payload payload : mgmt.queue) {
                int i2 = i;
                i++;
                results().put(i2 + ": " + payload.url, payload.request);
            }
            results().put(ModelDataService.OK, true);
            return;
        }
        if (this.delegate.equals("toggle")) {
            mgmt.queueOpen.set(!mgmt.queueOpen.get());
            results().put("queueOpen", mgmt.queueOpen.get());
            return;
        }
        if (l.isLoggable(Level.INFO)) {
            l.log(Level.INFO, this.delegate);
        }
        if (!mgmt.queueOpen.get()) {
            throw new ServiceException("Queue closed for submission, try again later.");
        }
        if (mgmt.getRemainingCapacity() < queueRemainingLen) {
            throw new ServiceException("Queue capacity reached, try again later.");
        }
        if (needsWebHook && !metainfo().hasName(ModelDataService.KEY_WEBHOOK)) {
            throw new ServiceException("'webhook' metainfo missing.");
        }
        if (delegateUrl == null) {
            String url = request().getURL();
            delegateUrl = url.substring(0, url.indexOf(":"));
        }
        String str = delegateUrl + ":" + this.delegate;
        JSONObject jSONObject = new JSONObject(getRequest().toString());
        JSONObject jSONObject2 = jSONObject.getJSONObject(ModelDataService.KEY_METAINFO);
        jSONObject2.put(ModelDataService.KEY_MODE, ModelDataService.ASYNC);
        jSONObject2.remove(ModelDataService.KEY_CLOUD_NODE);
        jSONObject2.remove(ModelDataService.KEY_STATUS);
        jSONObject2.remove(ModelDataService.KEY_TSTAMP);
        jSONObject2.remove(ModelDataService.KEY_REQ_IP);
        jSONObject2.put("csip-auth", request().getAuthToken());
        try {
            if (mgmt.checkTarget && Client.ping(str, mgmt.pingTimeout) == -1) {
                throw new ServiceException("Target service not available: " + str);
            }
            mgmt.incoming.incrementAndGet();
            long queue = mgmt.queue(str, jSONObject.toString());
            if (queue == -1) {
                throw new ServiceException("Error queueing the service, try again later.");
            }
            getMetainfo().put(KEY_QUEUE_POS, queue);
            if (l.isLoggable(Level.INFO)) {
                l.log(Level.INFO, "QUEUE POS, " + queue);
            }
            this.isQueued = true;
        } catch (Exception e) {
            throw new ServiceException("Error queueing the service", e);
        }
    }

    public static void onContextInit() {
        try {
            mgmt.startup();
            l.log(Level.INFO, "Started Pub/Sub Threads.");
        } catch (Exception e) {
            l.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }

    public static void onContextDestroy() {
        try {
            mgmt.shutdown();
            l.log(Level.INFO, "Stopped Pub/Sub Threads.");
        } catch (Exception e) {
            l.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }
}
