/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.client;

import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.network.WorkerEndpoint;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.mantisrx.server.worker.client.JobWorkerMetricsLocator;
import io.mantisrx.server.worker.client.MetricsClient;
import io.mantisrx.server.worker.client.WorkerConnection;
import io.mantisrx.server.worker.client.WorkerConnectionFunc;
import io.mantisrx.server.worker.client.WorkerConnectionsStatus;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivx.mantis.operators.DropOperator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

class MetricsClientImpl<T>
implements MetricsClient<T> {
    private static final Logger logger = LoggerFactory.getLogger(MetricsClientImpl.class);
    final String jobId;
    final WorkerConnectionFunc<T> workerConnectionFunc;
    final JobWorkerMetricsLocator jobWorkerMetricsLocator;
    private final AtomicBoolean nowClosed = new AtomicBoolean(false);
    private final WorkerConnections workerConnections = new WorkerConnections();
    private final String workersGuageName = "MetricsConnections";
    private final String expectedWorkersGaugeName = "ExpectedMetricsConnections";
    private final String workerConnReceivingDataGaugeName = "metricsRecvngData";
    private final Gauge workersGauge;
    private final Gauge expectedWorkersGauge;
    private final Gauge workerConnReceivingDataGauge;
    private final AtomicInteger numWorkers = new AtomicInteger();
    private final Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver;
    private final long dataRecvTimeoutSecs;

    MetricsClientImpl(String jobId, WorkerConnectionFunc<T> workerConnectionFunc, JobWorkerMetricsLocator jobWorkerMetricsLocator, Observable<Integer> numWorkersObservable, Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver, long dataRecvTimeoutSecs) {
        this.jobId = jobId;
        this.workerConnectionFunc = workerConnectionFunc;
        this.jobWorkerMetricsLocator = jobWorkerMetricsLocator;
        Metrics metrics = new Metrics.Builder().name(MetricsClientImpl.class.getCanonicalName() + "-" + jobId).addGauge("MetricsConnections").addGauge("ExpectedMetricsConnections").addGauge("metricsRecvngData").build();
        metrics = MetricsRegistry.getInstance().registerAndGet(metrics);
        this.workersGauge = metrics.getGauge("MetricsConnections");
        this.expectedWorkersGauge = metrics.getGauge("ExpectedMetricsConnections");
        this.workerConnReceivingDataGauge = metrics.getGauge("metricsRecvngData");
        numWorkersObservable.doOnNext((Action1)new Action1<Integer>(){

            public void call(Integer integer) {
                MetricsClientImpl.this.numWorkers.set(integer);
            }
        }).takeWhile((Func1)new Func1<Integer, Boolean>(){

            public Boolean call(Integer integer) {
                return !MetricsClientImpl.this.nowClosed.get();
            }
        }).subscribe();
        this.workerConnectionsStatusObserver = workerConnectionsStatusObserver;
        this.dataRecvTimeoutSecs = dataRecvTimeoutSecs;
    }

    private String toWorkerConnName(String host, int port) {
        return host + "-" + port;
    }

    @Override
    public boolean hasError() {
        return false;
    }

    @Override
    public String getError() {
        return null;
    }

    @Override
    public Observable<Observable<T>> getResults() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Observable<T>>(){

            public void call(Subscriber subscriber) {
                MetricsClientImpl.this.internalGetResults().subscribe(subscriber);
            }
        }).subscribeOn(Schedulers.io());
    }

    private Observable<Observable<T>> internalGetResults() {
        return this.jobWorkerMetricsLocator.locateWorkerMetricsForJob(this.jobId).map(new Func1<EndpointChange, Observable<T>>(){

            public Observable<T> call(EndpointChange endpointChange) {
                if (MetricsClientImpl.this.nowClosed.get()) {
                    return Observable.empty();
                }
                if (endpointChange.getType() == EndpointChange.Type.complete) {
                    return MetricsClientImpl.this.handleEndpointClose(endpointChange);
                }
                return MetricsClientImpl.this.handleEndpointConnect(endpointChange);
            }
        }).lift(new Observable.Operator<Observable<T>, Observable<T>>(){

            public Subscriber<? super Observable<T>> call(Subscriber<? super Observable<T>> subscriber) {
                subscriber.add(Subscriptions.create((Action0)new Action0(){

                    public void call() {
                        try {
                            logger.warn("Closing metrics connections to workers of job " + MetricsClientImpl.this.jobId);
                            MetricsClientImpl.this.closeAllConnections();
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }));
                return subscriber;
            }
        }).share().lift((Observable.Operator)new DropOperator("client_metrics_share"));
    }

    private Observable<T> handleEndpointConnect(EndpointChange ec) {
        logger.info("Opening connection to metrics sink at " + ec.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost((String)ec.getEndpoint().getHost());
        if (!(ec.getEndpoint() instanceof WorkerEndpoint)) {
            logger.error("endpoint received on Endpoint connect is not a WorkerEndpoint {}, no metrics port to connect to", (Object)ec.getEndpoint());
            return Observable.empty();
        }
        int metricsPort = ((WorkerEndpoint)ec.getEndpoint()).getMetricPort();
        WorkerConnection<T> workerConnection = this.workerConnectionFunc.call(unwrappedHost, metricsPort, new Action1<Boolean>(){

            public void call(Boolean flag) {
                MetricsClientImpl.this.updateWorkerConx(flag);
            }
        }, new Action1<Boolean>(){

            public void call(Boolean flag) {
                MetricsClientImpl.this.updateWorkerDataReceivingStatus(flag);
            }
        }, this.dataRecvTimeoutSecs);
        if (this.nowClosed.get()) {
            try {
                workerConnection.close();
            }
            catch (Exception e) {
                logger.warn("Error closing worker metrics connection " + workerConnection.getName() + " - " + e.getMessage(), (Throwable)e);
            }
            return Observable.empty();
        }
        this.workerConnections.put(this.toWorkerConnName(unwrappedHost, metricsPort), workerConnection);
        if (this.nowClosed.get()) {
            try {
                workerConnection.close();
                this.workerConnections.remove(this.toWorkerConnName(unwrappedHost, metricsPort));
                return Observable.empty();
            }
            catch (Exception e) {
                logger.warn("Error closing worker metrics connection - " + e.getMessage());
            }
        }
        return (Observable)workerConnection.call();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateWorkerDataReceivingStatus(Boolean flag) {
        if (flag.booleanValue()) {
            this.workerConnReceivingDataGauge.increment();
        } else {
            this.workerConnReceivingDataGauge.decrement();
        }
        this.expectedWorkersGauge.set((long)this.numWorkers.get());
        if (this.workerConnectionsStatusObserver != null) {
            Observer<WorkerConnectionsStatus> observer = this.workerConnectionsStatusObserver;
            synchronized (observer) {
                this.workerConnectionsStatusObserver.onNext((Object)new WorkerConnectionsStatus(this.workerConnReceivingDataGauge.value(), this.workersGauge.value(), this.numWorkers.get()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateWorkerConx(Boolean flag) {
        if (flag.booleanValue()) {
            this.workersGauge.increment();
        } else {
            this.workersGauge.decrement();
        }
        this.expectedWorkersGauge.set((long)this.numWorkers.get());
        if (this.workerConnectionsStatusObserver != null) {
            Observer<WorkerConnectionsStatus> observer = this.workerConnectionsStatusObserver;
            synchronized (observer) {
                this.workerConnectionsStatusObserver.onNext((Object)new WorkerConnectionsStatus(this.workerConnReceivingDataGauge.value(), this.workersGauge.value(), this.numWorkers.get()));
            }
        }
    }

    private Observable<T> handleEndpointClose(EndpointChange ec) {
        logger.info("Closed connection to metrics sink at " + ec.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost((String)ec.getEndpoint().getHost());
        if (!(ec.getEndpoint() instanceof WorkerEndpoint)) {
            logger.warn("endpoint received on Endpoint close is not a WorkerEndpoint {}, worker endpoint required for metrics port", (Object)ec.getEndpoint());
            return Observable.empty();
        }
        int metricsPort = ((WorkerEndpoint)ec.getEndpoint()).getMetricPort();
        WorkerConnection removed = this.workerConnections.remove(this.toWorkerConnName(unwrappedHost, metricsPort));
        if (removed != null) {
            try {
                removed.close();
            }
            catch (Exception e) {
                logger.error("Unexpected exception on closing worker metrics connection: " + e.getMessage(), (Throwable)e);
            }
        }
        return Observable.empty();
    }

    private void closeAllConnections() throws Exception {
        this.nowClosed.set(true);
        this.workerConnections.closeOut(new Action1<WorkerConnection<T>>(){

            public void call(WorkerConnection<T> tWorkerConnection) {
                try {
                    tWorkerConnection.close();
                }
                catch (Exception e) {
                    logger.warn("Error closing worker metrics connection " + tWorkerConnection.getName() + " - " + e.getMessage(), (Throwable)e);
                }
            }
        });
    }

    class WorkerConnections {
        private final Map<String, WorkerConnection<T>> workerConnections = new HashMap();
        private boolean isClosed = false;

        WorkerConnections() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void put(String key, WorkerConnection<T> val) {
            Map map = this.workerConnections;
            synchronized (map) {
                if (this.isClosed) {
                    return;
                }
                this.workerConnections.put(key, val);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private WorkerConnection<T> remove(String key) {
            Map map = this.workerConnections;
            synchronized (map) {
                return this.workerConnections.remove(key);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeOut(Action1<WorkerConnection<T>> onClose) {
            Map map = this.workerConnections;
            synchronized (map) {
                this.isClosed = true;
            }
            for (WorkerConnection workerConnection : this.workerConnections.values()) {
                logger.info("Closing " + workerConnection.getName());
                onClose.call((Object)workerConnection);
            }
        }
    }
}

