/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.simba.redis;

import com.google.common.base.Strings;
import com.google.common.io.Resources;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.reactive.RedisScriptingReactiveCommands;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import me.ahoo.simba.SimbaException;
import me.ahoo.simba.core.AbstractMutexContendService;
import me.ahoo.simba.core.MutexContender;
import me.ahoo.simba.redis.Message;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedisMutexContendService
extends AbstractMutexContendService {
    private static final Logger log = LoggerFactory.getLogger(RedisMutexContendService.class);
    private static final byte[] SCRIPT_ACQUIRE = RedisMutexContendService.getScript("mutex_acquire.lua");
    private static final byte[] SCRIPT_RELEASE = RedisMutexContendService.getScript("mutex_release.lua");
    private static final byte[] SCRIPT_GUARD = RedisMutexContendService.getScript("mutex_guard.lua");
    private final String[] keys;
    private final String mutexChannel;
    private final String ownerChannel;
    private final Duration ttl;
    private final Duration timerTtl;
    private final RedisScriptingReactiveCommands<String, String> redisCommands;
    private final RedisPubSubReactiveCommands<String, String> redisPubSubCommands;
    private ChannelSubscriber channelSubscriber;
    private Disposable acquiredTimer;
    private final AtomicInteger acquireTimes = new AtomicInteger(0);

    protected RedisMutexContendService(MutexContender contender, Executor handleExecutor, Duration ttl, RedisScriptingReactiveCommands<String, String> redisCommands, RedisPubSubReactiveCommands<String, String> redisPubSubCommands) {
        super(contender, handleExecutor);
        this.keys = new String[]{"{" + contender.getMutex() + "}"};
        this.mutexChannel = Strings.lenientFormat((String)"%s:{%s}", (Object[])new Object[]{"simba", contender.getMutex()});
        this.ownerChannel = Strings.lenientFormat((String)"%s:%s", (Object[])new Object[]{this.mutexChannel, contender.getContenderId()});
        this.ttl = ttl;
        this.redisCommands = redisCommands;
        this.redisPubSubCommands = redisPubSubCommands;
        this.timerTtl = Duration.ofMillis(this.ttl.toMillis() / 2L);
    }

    public boolean isOwner() {
        long currentTime;
        if (!super.currentOwnerIsMe()) {
            return false;
        }
        long acquiredExpiredTime = this.getAcquiredExpiredTime();
        return acquiredExpiredTime > (currentTime = System.currentTimeMillis());
    }

    private long getAcquiredExpiredTime() {
        long acquiredExpiredTime = this.getCurrentOwner().getAcquiredAt() + this.ttl.toMillis();
        return acquiredExpiredTime;
    }

    public static byte[] getScript(String scriptName) {
        try {
            URL url = Resources.getResource((String)scriptName);
            return Resources.toByteArray((URL)url);
        }
        catch (IOException ioException) {
            if (log.isErrorEnabled()) {
                log.error(ioException.getMessage(), (Throwable)ioException);
            }
            throw new SimbaException(ioException.getMessage(), (Throwable)ioException);
        }
    }

    private void startAcquiredTimer() {
        if (this.acquiredTimer != null && !this.acquiredTimer.isDisposed()) {
            return;
        }
        this.acquiredTimer = Flux.interval((Duration)this.timerTtl).flatMap(tick -> {
            if (this.currentOwnerIsMe()) {
                return this.intervalGuard();
            }
            return this.intervalAcquire();
        }).subscribe();
    }

    private void disposeAcquiredTimer() {
        if (this.acquiredTimer == null || this.acquiredTimer.isDisposed()) {
            return;
        }
        this.acquiredTimer.dispose();
    }

    private Mono<Boolean> intervalGuard() {
        Object[] values = new String[]{this.getContenderId(), String.valueOf(this.ttl.toMillis())};
        return this.redisCommands.eval(SCRIPT_GUARD, ScriptOutputType.BOOLEAN, (Object[])this.keys, values).next().cast(Boolean.class).map(succeed -> {
            if (log.isDebugEnabled()) {
                log.debug("intervalGuard - mutex:[{}] - contenderId:[{}] - [{}]- [{}]", new Object[]{this.getMutex(), this.getContenderId(), this.acquireTimes.incrementAndGet(), succeed});
            }
            if (succeed.booleanValue()) {
                this.handleAcquired(this.getContenderId());
            }
            return succeed;
        });
    }

    private Mono<Boolean> intervalAcquire() {
        return this.tryAcquire().map(succeed -> {
            if (log.isDebugEnabled()) {
                log.debug("intervalAcquire - mutex:[{}] - contenderId:[{}] - [{}] - [{}]", new Object[]{this.getMutex(), this.getContenderId(), this.acquireTimes.incrementAndGet(), succeed});
            }
            return succeed;
        });
    }

    protected void startContend() {
        this.channelSubscriber = new ChannelSubscriber();
        this.redisPubSubCommands.observeChannels().subscribe((CoreSubscriber)this.channelSubscriber);
        this.startAcquiredTimer();
        this.redisPubSubCommands.subscribe((Object[])new String[]{this.mutexChannel, this.ownerChannel}).then(this.tryAcquire()).subscribe();
    }

    private Mono<Boolean> tryAcquire() {
        Object[] values = new String[]{this.getContenderId(), String.valueOf(this.ttl.toMillis())};
        return this.redisCommands.eval(SCRIPT_ACQUIRE, ScriptOutputType.BOOLEAN, (Object[])this.keys, values).next().ofType(Boolean.class).map(succeed -> {
            if (succeed.booleanValue()) {
                this.handleAcquired(this.getContenderId());
            }
            return succeed;
        });
    }

    protected void stopContend() {
        if (this.channelSubscriber != null) {
            this.channelSubscriber.cancel();
        }
        this.disposeAcquiredTimer();
        Object[] values = new String[]{this.getContenderId()};
        this.redisPubSubCommands.unsubscribe((Object[])new String[]{this.mutexChannel, this.ownerChannel}).thenMany((Publisher)this.redisCommands.eval(SCRIPT_RELEASE, ScriptOutputType.BOOLEAN, (Object[])this.keys, values)).next().ofType(Boolean.class).doOnNext(succeed -> {
            block2: {
                try {
                    this.handleReleased(this.getContenderId());
                }
                catch (Throwable throwable) {
                    if (!log.isWarnEnabled()) break block2;
                    log.warn("stopContend - mutex:[{}] - contenderId:[{}] - message:[{}]", new Object[]{this.getMutex(), this.getContenderId(), throwable.getMessage()});
                }
            }
        }).subscribe();
    }

    public class ChannelSubscriber
    extends BaseSubscriber<ChannelMessage<String, String>> {
        protected void hookOnNext(ChannelMessage<String, String> value) {
            block12: {
                boolean subscribed = ((String)value.getChannel()).startsWith(RedisMutexContendService.this.mutexChannel);
                if (log.isDebugEnabled()) {
                    log.debug("hookOnNext - mutex:[{}] - ownerId:[{}] - channel:[{}] - message:[{}] - subscribed:[{}]", new Object[]{RedisMutexContendService.this.getMutex(), RedisMutexContendService.this.getContenderId(), value.getChannel(), value.getMessage(), subscribed});
                }
                if (!subscribed) {
                    return;
                }
                try {
                    Message message = Message.of((String)value.getMessage());
                    switch (message.getEvent()) {
                        case "released": {
                            RedisMutexContendService.this.handleReleased(message.getOwnerId());
                            RedisMutexContendService.this.tryAcquire().subscribe();
                            break;
                        }
                        case "acquired": {
                            RedisMutexContendService.this.handleAcquired(message.getOwnerId());
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected value: " + message.getEvent());
                        }
                    }
                }
                catch (Throwable throwable) {
                    if (!log.isErrorEnabled()) break block12;
                    log.error(throwable.getMessage(), throwable);
                }
            }
        }
    }
}

