/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.client.example;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.server.worker.client.MetricsClient;
import io.mantisrx.server.worker.client.SseWorkerConnectionFunction;
import io.mantisrx.server.worker.client.WorkerMetricsClient;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;

public class SampleClient {
    private static final Logger logger = LoggerFactory.getLogger(SampleClient.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    @Argument(alias="p", description="Specify a configuration file", required=true)
    private static String propFile = "";
    @Argument(alias="j", description="Specify a job Id", required=false)
    private static String jobId;
    @Argument(alias="c", description="Command to run submit", required=true)
    private static String cmd;

    public static void main(String[] args) {
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        try {
            Args.parse(SampleClient.class, (String[])args);
        }
        catch (IllegalArgumentException e) {
            Args.usage(SampleClient.class);
            System.exit(1);
        }
        System.out.println("propfile=" + propFile);
        Properties properties = new Properties();
        try {
            FileInputStream inputStream = new FileInputStream(propFile);
            Object object = null;
            try {
                properties.load(inputStream);
            }
            catch (Throwable throwable) {
                object = throwable;
                throw throwable;
            }
            finally {
                if (inputStream != null) {
                    if (object != null) {
                        try {
                            ((InputStream)inputStream).close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)object).addSuppressed(throwable);
                        }
                    } else {
                        ((InputStream)inputStream).close();
                    }
                }
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        final WorkerMetricsClient mantisClient = new WorkerMetricsClient(properties);
        try {
            switch (cmd) {
                case "metrics": {
                    if (jobId == null || jobId.isEmpty()) {
                        logger.error("Must provide jobId to connect to its metrics sink");
                        break;
                    }
                    Thread thread = new Thread(){

                        @Override
                        public void run() {
                            SampleClient.getMetricsData(mantisClient, jobId);
                        }
                    };
                    thread.start();
                    break;
                }
                default: {
                    logger.error("Unknown command " + cmd);
                    break;
                }
            }
        }
        catch (Throwable t) {
            t.printStackTrace();
        }
    }

    private static void getMetricsData(WorkerMetricsClient mantisClient, final String localJobId) {
        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch finishLatch = new CountDownLatch(1);
        MetricsClient<MantisServerSentEvent> metricsClient = mantisClient.getMetricsClientByJobId(localJobId, new SseWorkerConnectionFunction(true, new Action1<Throwable>(){

            public void call(Throwable throwable) {
                logger.error("Sink connection error: " + throwable.getMessage());
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ie) {
                    logger.error("Interrupted waiting for retrying connection");
                }
            }
        }), null);
        logger.info("Getting results observable for job {}", (Object)localJobId);
        Observable resultsObservable = Observable.merge(metricsClient.getResults());
        logger.info("Subscribing to it");
        final AtomicReference<Object> ref = new AtomicReference<Object>(null);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    startLatch.await();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    3.sleep(300000L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
                logger.info("Closing client conx");
                try {
                    ((Subscription)ref.get()).unsubscribe();
                    finishLatch.countDown();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        t.setDaemon(true);
        Subscription s = resultsObservable.doOnCompleted(new Action0(){

            public void call() {
                finishLatch.countDown();
            }
        }).subscribe((Action1)new Action1<MantisServerSentEvent>(){

            public void call(MantisServerSentEvent event) {
                if (startLatch.getCount() > 0L) {
                    startLatch.countDown();
                }
                logger.info("{} Got SSE: {}", (Object)localJobId, (Object)event.getEventAsString());
            }
        });
        ref.set(s);
        t.start();
        logger.info("SUBSCRIBED to job metrics changes");
        try {
            finishLatch.await();
            logger.info("Sink observable completed");
        }
        catch (InterruptedException e) {
            logger.error("thread interrupted", (Throwable)e);
        }
    }
}

