/*
 * Decompiled with CFR 0.152.
 */
package io.helixservice.feature.worker;

import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import io.helixservice.core.server.Server;
import io.helixservice.feature.worker.BlockingWorker;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.sync.Sync;
import java.util.function.Consumer;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.DeclarePrecedence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Aspect
@DeclarePrecedence(value="io.helixservice.feature.worker.BlockingWorkerAspect,*")
public class BlockingWorkerAspect {
    private static Logger LOG = LoggerFactory.getLogger(BlockingWorkerAspect.class);
    private static final String VERT_X_WORKER_THREAD = "vert.x-worker-thread";
    private Vertx vertx;

    @Before(value="execution(public void io.helixservice.feature.worker.BlockingWorkerFeature.start(io.helixservice.core.server.Server)) && args(server)")
    public void beforeStartupFeature(Server server) {
        this.vertx = server.getVertx().get();
    }

    @Suspendable
    @Around(value="(execution(public * *(..)) && @annotation(blockingWorker)) || (execution(public * *(..)) && within(@io.helixservice.feature.worker.BlockingWorker *) && @annotation(blockingWorker))")
    public Object around(ProceedingJoinPoint pjp, BlockingWorker blockingWorker) throws Throwable, SuspendExecution {
        Object result = BlockingWorkerAspect.onWorkerThread() ? pjp.proceed() : this.invokeOnWorkerThread(pjp);
        return result;
    }

    public static boolean onWorkerThread() {
        return Thread.currentThread().getName().contains(VERT_X_WORKER_THREAD);
    }

    @Suspendable
    private Object invokeOnWorkerThread(final ProceedingJoinPoint pjp) throws Throwable, SuspendExecution {
        AsyncResult ret = (AsyncResult)Sync.awaitEvent((Consumer)new Consumer<Handler<AsyncResult<Object>>>(){

            @Override
            @Suspendable
            public void accept(Handler<AsyncResult<Object>> awaitHandler) {
                BlockingWorkerAspect.this.vertx.executeBlocking((Handler)new Handler<Future<Object>>(){

                    @Suspendable
                    public void handle(Future<Object> future) {
                        try {
                            BlockingWorkerAspect.this.assertRunningOnVertxWorkerThread(pjp);
                            if (LOG.isInfoEnabled()) {
                                LOG.info("Started Blocking Worker on " + Thread.currentThread().getName());
                            }
                            Object result = pjp.proceed();
                            future.complete(result);
                        }
                        catch (Throwable t) {
                            future.fail(t);
                        }
                        finally {
                            if (LOG.isInfoEnabled()) {
                                LOG.info("Completed Blocking Worker on " + Thread.currentThread().getName());
                            }
                        }
                    }
                }, false, awaitHandler);
            }
        });
        if (ret.failed()) {
            throw ret.cause();
        }
        return ret.result();
    }

    private void assertRunningOnVertxWorkerThread(ProceedingJoinPoint pjp) {
        if (!BlockingWorkerAspect.onWorkerThread()) {
            Signature signature = pjp.getSignature();
            throw new IllegalStateException("Expected " + signature.getDeclaringTypeName() + "::" + signature.getName() + " to run on a Vert.x worker thread");
        }
    }
}

