package me.ahoo.simba.redis;

import com.google.common.base.Strings;
import com.google.common.io.Resources;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.reactive.RedisScriptingReactiveCommands;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import me.ahoo.simba.SimbaException;
import me.ahoo.simba.core.AbstractMutexContendService;
import me.ahoo.simba.core.MutexContender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:me/ahoo/simba/redis/RedisMutexContendService.class */
public class RedisMutexContendService extends AbstractMutexContendService {
    private static final Logger log = LoggerFactory.getLogger(RedisMutexContendService.class);
    private static final byte[] SCRIPT_ACQUIRE = getScript("mutex_acquire.lua");
    private static final byte[] SCRIPT_RELEASE = getScript("mutex_release.lua");
    private static final byte[] SCRIPT_GUARD = getScript("mutex_guard.lua");
    private final String[] keys;
    private final String mutexChannel;
    private final String ownerChannel;
    private final Duration ttl;
    private final Duration timerTtl;
    private final RedisScriptingReactiveCommands<String, String> redisCommands;
    private final RedisPubSubReactiveCommands<String, String> redisPubSubCommands;
    private ChannelSubscriber channelSubscriber;
    private Disposable acquiredTimer;
    private final AtomicInteger acquireTimes;

    /* loaded from: input_file:me/ahoo/simba/redis/RedisMutexContendService$ChannelSubscriber.class */
    public class ChannelSubscriber extends BaseSubscriber<ChannelMessage<String, String>> {
        public ChannelSubscriber() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(ChannelMessage<String, String> channelMessage) {
            boolean startsWith = ((String) channelMessage.getChannel()).startsWith(RedisMutexContendService.this.mutexChannel);
            if (RedisMutexContendService.log.isDebugEnabled()) {
                RedisMutexContendService.log.debug("hookOnNext - mutex:[{}] - ownerId:[{}] - channel:[{}] - message:[{}] - subscribed:[{}]", new Object[]{RedisMutexContendService.this.getMutex(), RedisMutexContendService.this.getContenderId(), channelMessage.getChannel(), channelMessage.getMessage(), Boolean.valueOf(startsWith)});
            }
            if (startsWith) {
                try {
                    Message of = Message.of((String) channelMessage.getMessage());
                    String event = of.getEvent();
                    boolean z = -1;
                    switch (event.hashCode()) {
                        case -1731151282:
                            if (event.equals(Message.EVENT_ACQUIRED)) {
                                z = true;
                                break;
                            }
                            break;
                        case -551298755:
                            if (event.equals(Message.EVENT_RELEASED)) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            RedisMutexContendService.this.handleReleased(of.getOwnerId());
                            RedisMutexContendService.this.tryAcquire().subscribe();
                            break;
                        case true:
                            RedisMutexContendService.this.handleAcquired(of.getOwnerId());
                            break;
                        default:
                            throw new IllegalStateException("Unexpected value: " + of.getEvent());
                    }
                } catch (Throwable th) {
                    if (RedisMutexContendService.log.isErrorEnabled()) {
                        RedisMutexContendService.log.error(th.getMessage(), th);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisMutexContendService(MutexContender mutexContender, Executor executor, Duration duration, RedisScriptingReactiveCommands<String, String> redisScriptingReactiveCommands, RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands) {
        super(mutexContender, executor);
        this.acquireTimes = new AtomicInteger(0);
        this.keys = new String[]{"{" + mutexContender.getMutex() + "}"};
        this.mutexChannel = Strings.lenientFormat("%s:{%s}", new Object[]{"simba", mutexContender.getMutex()});
        this.ownerChannel = Strings.lenientFormat("%s:%s", new Object[]{this.mutexChannel, mutexContender.getContenderId()});
        this.ttl = duration;
        this.redisCommands = redisScriptingReactiveCommands;
        this.redisPubSubCommands = redisPubSubReactiveCommands;
        this.timerTtl = Duration.ofMillis(this.ttl.toMillis() / 2);
    }

    public boolean isOwner() {
        return super.currentOwnerIsMe() && getAcquiredExpiredTime() > System.currentTimeMillis();
    }

    private long getAcquiredExpiredTime() {
        return getCurrentOwner().getAcquiredAt() + this.ttl.toMillis();
    }

    public static byte[] getScript(String str) {
        try {
            return Resources.toByteArray(Resources.getResource(str));
        } catch (IOException e) {
            if (log.isErrorEnabled()) {
                log.error(e.getMessage(), e);
            }
            throw new SimbaException(e.getMessage(), e);
        }
    }

    private void startAcquiredTimer() {
        if (this.acquiredTimer == null || this.acquiredTimer.isDisposed()) {
            this.acquiredTimer = Flux.interval(this.timerTtl).flatMap(l -> {
                return currentOwnerIsMe() ? intervalGuard() : intervalAcquire();
            }).subscribe();
        }
    }

    private void disposeAcquiredTimer() {
        if (this.acquiredTimer == null || this.acquiredTimer.isDisposed()) {
            return;
        }
        this.acquiredTimer.dispose();
    }

    private Mono<Boolean> intervalGuard() {
        return this.redisCommands.eval(SCRIPT_GUARD, ScriptOutputType.BOOLEAN, this.keys, new String[]{getContenderId(), String.valueOf(this.ttl.toMillis())}).next().cast(Boolean.class).map(bool -> {
            if (log.isDebugEnabled()) {
                log.debug("intervalGuard - mutex:[{}] - contenderId:[{}] - [{}]- [{}]", new Object[]{getMutex(), getContenderId(), Integer.valueOf(this.acquireTimes.incrementAndGet()), bool});
            }
            if (bool.booleanValue()) {
                handleAcquired(getContenderId());
            }
            return bool;
        });
    }

    private Mono<Boolean> intervalAcquire() {
        return tryAcquire().map(bool -> {
            if (log.isDebugEnabled()) {
                log.debug("intervalAcquire - mutex:[{}] - contenderId:[{}] - [{}] - [{}]", new Object[]{getMutex(), getContenderId(), Integer.valueOf(this.acquireTimes.incrementAndGet()), bool});
            }
            return bool;
        });
    }

    protected void startContend() {
        this.channelSubscriber = new ChannelSubscriber();
        this.redisPubSubCommands.observeChannels().subscribe(this.channelSubscriber);
        startAcquiredTimer();
        this.redisPubSubCommands.subscribe(new String[]{this.mutexChannel, this.ownerChannel}).then(tryAcquire()).subscribe();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Boolean> tryAcquire() {
        return this.redisCommands.eval(SCRIPT_ACQUIRE, ScriptOutputType.BOOLEAN, this.keys, new String[]{getContenderId(), String.valueOf(this.ttl.toMillis())}).next().ofType(Boolean.class).map(bool -> {
            if (bool.booleanValue()) {
                handleAcquired(getContenderId());
            }
            return bool;
        });
    }

    protected void stopContend() {
        if (this.channelSubscriber != null) {
            this.channelSubscriber.cancel();
        }
        disposeAcquiredTimer();
        this.redisPubSubCommands.unsubscribe(new String[]{this.mutexChannel, this.ownerChannel}).thenMany(this.redisCommands.eval(SCRIPT_RELEASE, ScriptOutputType.BOOLEAN, this.keys, new String[]{getContenderId()})).next().ofType(Boolean.class).doOnNext(bool -> {
            try {
                handleReleased(getContenderId());
            } catch (Throwable th) {
                if (log.isWarnEnabled()) {
                    log.warn("stopContend - mutex:[{}] - contenderId:[{}] - message:[{}]", new Object[]{getMutex(), getContenderId(), th.getMessage()});
                }
            }
        }).subscribe();
    }
}
