package ai.grakn.redisq;

import ai.grakn.redisq.Document;
import ai.grakn.redisq.consumer.Mapper;
import ai.grakn.redisq.consumer.QueueConsumer;
import ai.grakn.redisq.consumer.RedisqConsumer;
import ai.grakn.redisq.consumer.TimedWrap;
import ai.grakn.redisq.exceptions.DeserializationException;
import ai.grakn.redisq.exceptions.RedisqException;
import ai.grakn.redisq.exceptions.SerializationException;
import ai.grakn.redisq.exceptions.StateFutureInitializationException;
import ai.grakn.redisq.exceptions.WaitException;
import ai.grakn.redisq.util.Names;
import com.codahale.metrics.CachedGauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.util.Pool;

/* loaded from: input_file:ai/grakn/redisq/Redisq.class */
public class Redisq<T extends Document> implements Queue<T> {
    static final Mapper<StateInfo> stateMapper = new Mapper<>(StateInfo.class);
    private static final Logger LOG = LoggerFactory.getLogger(Redisq.class);
    private static final Retryer<Integer> CLOSE_RETRIER = RetryerBuilder.newBuilder().retryIfResult(num -> {
        return num == null || num.intValue() != 0;
    }).withWaitStrategy(WaitStrategies.fixedWait(100, TimeUnit.MILLISECONDS)).withStopStrategy(StopStrategies.stopAfterDelay(10, TimeUnit.SECONDS)).build();
    private static final int DEFAULT_SUBSCRIPTION_WAIT_TIMEOUT_SECONDS = 30;
    private static final int MARGIN_MS = 60000;
    private final String queueName;
    private final String inFlightQueueName;
    private final String name;
    private final Duration timeout;
    private final Mapper<TimedWrap<T>> mapper;
    private final Names names;
    private final int lockTime;
    private final Pool<Jedis> jedisPool;
    private int ttlStateInfo;
    private final ExecutorService threadPool;
    private final AtomicBoolean working = new AtomicBoolean(false);
    private final AtomicInteger runningThreads = new AtomicInteger(0);
    private Duration discardTime;
    private final MetricRegistry metricRegistry;
    private QueueConsumer<T> subscription;
    private Future<?> mainLoop;
    private Future<?> inFlightLoop;
    private final Timer restoreBlockedTimer;
    private final Timer idleTimer;
    private final Timer pushTimer;
    private final Timer executeWaitTimer;
    private final Meter serializationErrors;

    public Redisq(String str, Duration duration, Duration duration2, Duration duration3, Duration duration4, Consumer<T> consumer, Class<T> cls, final Pool<Jedis> pool, ExecutorService executorService, MetricRegistry metricRegistry) {
        Preconditions.checkState(duration2.minus(duration3).toMillis() > 60000, "The ttl for a state has to be higher than the time a document is locked for by 60000ms");
        this.name = str;
        this.timeout = duration;
        this.ttlStateInfo = (int) duration2.getSeconds();
        this.lockTime = (int) duration3.getSeconds();
        this.discardTime = duration4;
        this.metricRegistry = metricRegistry;
        this.subscription = new RedisqConsumer(consumer, pool, this);
        this.names = new Names();
        this.queueName = this.names.queueNameFor(str);
        this.inFlightQueueName = this.names.inFlightQueueNameFor(str);
        this.jedisPool = pool;
        this.threadPool = executorService;
        this.mapper = new Mapper<>(new ObjectMapper().getTypeFactory().constructParametricType(TimedWrap.class, new Class[]{cls}));
        this.pushTimer = metricRegistry.timer(MetricRegistry.name(getClass(), new String[]{"push"}));
        this.idleTimer = metricRegistry.timer(MetricRegistry.name(getClass(), new String[]{"idle"}));
        metricRegistry.register(MetricRegistry.name(getClass(), new String[]{"queue", "size"}), new CachedGauge<Long>(15L, TimeUnit.SECONDS) { // from class: ai.grakn.redisq.Redisq.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: loadValue, reason: merged with bridge method [inline-methods] */
            public Long m1loadValue() {
                Jedis jedis = (Jedis) pool.getResource();
                Throwable th = null;
                try {
                    Long llen = jedis.llen(Redisq.this.queueName);
                    if (jedis != null) {
                        if (0 != 0) {
                            try {
                                jedis.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            jedis.close();
                        }
                    }
                    return llen;
                } catch (Throwable th3) {
                    if (jedis != null) {
                        if (0 != 0) {
                            try {
                                jedis.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            jedis.close();
                        }
                    }
                    throw th3;
                }
            }
        });
        this.restoreBlockedTimer = metricRegistry.timer(MetricRegistry.name(getClass(), new String[]{"restore_blocked"}));
        this.executeWaitTimer = metricRegistry.timer(MetricRegistry.name(getClass(), new String[]{"execute_wait"}));
        this.serializationErrors = metricRegistry.meter(MetricRegistry.name(getClass(), new String[]{"serialization_errors"}));
    }

    @Override // ai.grakn.redisq.Queue
    public void push(T t) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            String serialize = this.mapper.serialize(new TimedWrap<>(t, currentTimeMillis));
            String serialize2 = stateMapper.serialize(new StateInfo(State.NEW, currentTimeMillis, ""));
            LOG.debug("Jedis active: {}, idle: {}", Integer.valueOf(this.jedisPool.getNumActive()), Integer.valueOf(this.jedisPool.getNumIdle()));
            Jedis jedis = (Jedis) this.jedisPool.getResource();
            Throwable th = null;
            try {
                Timer.Context time = this.pushTimer.time();
                Throwable th2 = null;
                try {
                    try {
                        Transaction multi = jedis.multi();
                        String idAsString = t.getIdAsString();
                        String lockKeyFromId = this.names.lockKeyFromId(idAsString);
                        multi.setex(lockKeyFromId, this.lockTime, "locked");
                        multi.lpush(this.queueName, new String[]{idAsString});
                        multi.setex(this.names.contentKeyFromId(idAsString), this.ttlStateInfo, serialize);
                        multi.setex(this.names.stateKeyFromId(idAsString), this.ttlStateInfo, serialize2);
                        multi.publish(this.names.stateChannelKeyFromId(idAsString), serialize2);
                        multi.exec();
                        LOG.debug("Pushed {} with lockTime {}s lock id: {}", new Object[]{idAsString, Integer.valueOf(this.lockTime), lockKeyFromId});
                        if (time != null) {
                            if (0 != 0) {
                                try {
                                    time.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                time.close();
                            }
                        }
                        if (jedis != null) {
                            if (0 == 0) {
                                jedis.close();
                                return;
                            }
                            try {
                                jedis.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (time != null) {
                        if (th2 != null) {
                            try {
                                time.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            time.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (jedis != null) {
                    if (0 != 0) {
                        try {
                            jedis.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        jedis.close();
                    }
                }
                throw th8;
            }
        } catch (SerializationException e) {
            this.serializationErrors.mark();
            throw new RedisqException("Could not serialize element " + t.getIdAsString(), e);
        }
    }

    @Override // ai.grakn.redisq.Queue
    public void startConsumer() {
        LOG.debug("Starting consumer {}", this.name);
        this.working.set(true);
        this.mainLoop = Executors.newSingleThreadExecutor().submit(() -> {
            while (this.working.get()) {
                iteration();
            }
        });
        this.inFlightLoop = Executors.newSingleThreadExecutor().submit(() -> {
            while (this.working.get()) {
                inflightIteration();
                try {
                    TimeUnit.MILLISECONDS.sleep(5000L);
                } catch (InterruptedException e) {
                    LOG.warn("Inflight sleep interrupted", e);
                }
            }
        });
    }

    @Override // ai.grakn.redisq.Queue
    public Future<Void> getFutureForDocumentStateWait(Set<State> set, String str) throws StateFutureInitializationException {
        return new StateFuture(set, str, this.jedisPool, 30L, TimeUnit.SECONDS, this.metricRegistry);
    }

    @Override // ai.grakn.redisq.Queue
    public Future<Void> getFutureForDocumentStateWait(Set<State> set, String str, long j, TimeUnit timeUnit, Pool<Jedis> pool) throws StateFutureInitializationException {
        return new StateFuture(set, str, pool, j, timeUnit, this.metricRegistry);
    }

    private void inflightIteration() {
        Jedis jedis = (Jedis) this.jedisPool.getResource();
        Throwable th = null;
        try {
            try {
                List lrange = jedis.lrange(this.inFlightQueueName, 0L, -1L);
                if (jedis != null) {
                    if (0 != 0) {
                        try {
                            jedis.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        jedis.close();
                    }
                }
                LOG.debug("Found {} documents in flight", Integer.valueOf(lrange.size()));
                lrange.forEach(str -> {
                    Jedis jedis2 = (Jedis) this.jedisPool.getResource();
                    Throwable th3 = null;
                    try {
                        String lockKeyFromId = this.names.lockKeyFromId(str);
                        Long ttl = jedis2.ttl(lockKeyFromId);
                        LOG.debug("Id {} has {} ttl", str, ttl);
                        if (ttl.longValue() == 0 || ttl.longValue() == -2) {
                            Optional<StateInfo> state = getState(str);
                            if (!state.isPresent()) {
                                LOG.warn("Found expired document in inflight but no state info found for {}", str);
                            } else if (state.get().getState().equals(State.PROCESSING)) {
                                LOG.trace("Found unlocked element {}, lockId({}), ttl={}", new Object[]{str, lockKeyFromId, ttl});
                                Timer.Context time = this.restoreBlockedTimer.time();
                                Throwable th4 = null;
                                try {
                                    try {
                                        Transaction multi = jedis2.multi();
                                        multi.lrem(this.inFlightQueueName, 1L, str);
                                        multi.lpush(this.queueName, new String[]{str});
                                        multi.exec();
                                        if (time != null) {
                                            if (0 != 0) {
                                                try {
                                                    time.close();
                                                } catch (Throwable th5) {
                                                    th4.addSuppressed(th5);
                                                }
                                            } else {
                                                time.close();
                                            }
                                        }
                                    } catch (Throwable th6) {
                                        th4 = th6;
                                        throw th6;
                                    }
                                } catch (Throwable th7) {
                                    if (time != null) {
                                        if (th4 != null) {
                                            try {
                                                time.close();
                                            } catch (Throwable th8) {
                                                th4.addSuppressed(th8);
                                            }
                                        } else {
                                            time.close();
                                        }
                                    }
                                    throw th7;
                                }
                            } else {
                                jedis2.lrem(this.inFlightQueueName, 1L, str);
                            }
                        }
                        if (jedis2 != null) {
                            if (0 == 0) {
                                jedis2.close();
                                return;
                            }
                            try {
                                jedis2.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        }
                    } catch (Throwable th10) {
                        if (jedis2 != null) {
                            if (0 != 0) {
                                try {
                                    jedis2.close();
                                } catch (Throwable th11) {
                                    th3.addSuppressed(th11);
                                }
                            } else {
                                jedis2.close();
                            }
                        }
                        throw th10;
                    }
                });
            } finally {
            }
        } catch (Throwable th3) {
            if (jedis != null) {
                if (th != null) {
                    try {
                        jedis.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    jedis.close();
                }
            }
            throw th3;
        }
    }

    private void iteration() {
        long currentTimeMillis = System.currentTimeMillis();
        Jedis jedis = (Jedis) this.jedisPool.getResource();
        Throwable th = null;
        try {
            Timer.Context time = this.idleTimer.time();
            Throwable th2 = null;
            try {
                try {
                    String brpoplpush = jedis.brpoplpush(this.queueName, this.inFlightQueueName, (int) this.timeout.getSeconds());
                    if (time != null) {
                        if (0 != 0) {
                            try {
                                time.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            time.close();
                        }
                    }
                    if (brpoplpush == null) {
                        LOG.debug("Empty queue");
                        if (jedis != null) {
                            if (0 == 0) {
                                jedis.close();
                                return;
                            }
                            try {
                                jedis.close();
                                return;
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                                return;
                            }
                        }
                        return;
                    }
                    String contentKeyFromId = this.names.contentKeyFromId(brpoplpush);
                    String lockAndGetDocument = lockAndGetDocument(currentTimeMillis, contentKeyFromId, jedis, brpoplpush);
                    if (lockAndGetDocument == null || contentKeyFromId == null) {
                        return;
                    }
                    try {
                        TimedWrap<T> deserialize = this.mapper.deserialize(lockAndGetDocument);
                        try {
                            if (Duration.ofMillis(currentTimeMillis - deserialize.getTimestampMs()).compareTo(this.discardTime) < 0) {
                                time = this.executeWaitTimer.time();
                                Throwable th5 = null;
                                try {
                                    try {
                                        execute(deserialize);
                                        if (time != null) {
                                            if (0 != 0) {
                                                try {
                                                    time.close();
                                                } catch (Throwable th6) {
                                                    th5.addSuppressed(th6);
                                                }
                                            } else {
                                                time.close();
                                            }
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th7) {
                                    th5 = th7;
                                    throw th7;
                                }
                            }
                        } catch (RejectedExecutionException e) {
                            processRejected(lockAndGetDocument, contentKeyFromId, deserialize, e);
                        }
                    } catch (DeserializationException e2) {
                        LOG.error("Failed deserialization, skipping element: {}", lockAndGetDocument, e2);
                    }
                } catch (Throwable th8) {
                    th2 = th8;
                    throw th8;
                }
            } finally {
                if (time != null) {
                    if (th2 != null) {
                        try {
                            time.close();
                        } catch (Throwable th9) {
                            th2.addSuppressed(th9);
                        }
                    } else {
                        time.close();
                    }
                }
            }
        } finally {
            if (jedis != null) {
                if (0 != 0) {
                    try {
                        jedis.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    jedis.close();
                }
            }
        }
    }

    private String lockAndGetDocument(long j, String str, Jedis jedis, String str2) {
        LOG.debug("Found id {}", str2);
        jedis.setex(this.names.lockKeyFromId(str2), this.lockTime, "locked");
        Optional<StateInfo> state = getState(str2);
        if (state.isPresent() && !state.get().getState().equals(State.NEW)) {
            LOG.warn("State already present for {}: {}", str2, state.get().getState());
        }
        setState(jedis, j, str2, State.PROCESSING, "");
        return jedis.get(str);
    }

    private void processRejected(String str, String str2, TimedWrap<T> timedWrap, RejectedExecutionException rejectedExecutionException) {
        try {
            Jedis jedis = (Jedis) this.jedisPool.getResource();
            Throwable th = null;
            try {
                try {
                    jedis.lpush(str2, new String[]{str});
                    LOG.error("Rejected execution, re-enqueued {}", timedWrap.getElement().getIdAsString(), rejectedExecutionException);
                    if (jedis != null) {
                        if (0 != 0) {
                            try {
                                jedis.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            jedis.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Could not re-enqueue {}", timedWrap.getElement().getIdAsString(), rejectedExecutionException);
        }
    }

    private void execute(TimedWrap<T> timedWrap) {
        this.threadPool.execute(() -> {
            this.runningThreads.incrementAndGet();
            try {
                this.subscription.process(timedWrap.getElement());
            } finally {
                this.runningThreads.decrementAndGet();
            }
        });
    }

    @Override // ai.grakn.redisq.Queue
    public void setState(String str, State state, String str2) {
        long currentTimeMillis = System.currentTimeMillis();
        Jedis jedis = (Jedis) this.jedisPool.getResource();
        Throwable th = null;
        try {
            try {
                setState(jedis, currentTimeMillis, str, state, str2);
                if (jedis != null) {
                    if (0 == 0) {
                        jedis.close();
                        return;
                    }
                    try {
                        jedis.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (jedis != null) {
                if (th != null) {
                    try {
                        jedis.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    jedis.close();
                }
            }
            throw th4;
        }
    }

    public void setState(Jedis jedis, long j, String str, State state, String str2) {
        StateInfo stateInfo = new StateInfo(state, j, str2);
        try {
            String serialize = stateMapper.serialize(stateInfo);
            jedis.setex(this.names.stateKeyFromId(str), this.ttlStateInfo, serialize);
            jedis.publish(this.names.stateChannelKeyFromId(str), serialize);
        } catch (SerializationException e) {
            throw new RedisqException("Could not serialize state " + stateInfo);
        }
    }

    @Override // ai.grakn.redisq.Queue
    public Optional<StateInfo> getState(String str) {
        return getStateInfoFromRedisKey(this.names.stateKeyFromId(str));
    }

    @Override // ai.grakn.redisq.Queue
    public Stream<Optional<ExtendedStateInfo>> getStates() {
        Jedis jedis = (Jedis) this.jedisPool.getResource();
        Throwable th = null;
        try {
            try {
                Stream stream = jedis.keys(this.names.stateKeyFromId("*")).stream();
                if (jedis != null) {
                    if (0 != 0) {
                        try {
                            jedis.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        jedis.close();
                    }
                }
                return stream.map(str -> {
                    return getStateInfoFromRedisKey(str).map(stateInfo -> {
                        return new ExtendedStateInfo(str, stateInfo);
                    });
                });
            } finally {
            }
        } catch (Throwable th3) {
            if (jedis != null) {
                if (th != null) {
                    try {
                        jedis.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    jedis.close();
                }
            }
            throw th3;
        }
    }

    @Override // ai.grakn.redisq.Queue
    public void close() throws InterruptedException {
        LOG.debug("Closing {}", this.name);
        synchronized (this) {
            this.working.set(false);
            if (this.mainLoop != null) {
                try {
                    this.mainLoop.get();
                    this.inFlightLoop.get();
                    try {
                        Retryer<Integer> retryer = CLOSE_RETRIER;
                        AtomicInteger atomicInteger = this.runningThreads;
                        atomicInteger.getClass();
                        retryer.call(atomicInteger::get);
                    } catch (RetryException e) {
                        LOG.warn("Closing while some threads are still running");
                    }
                } catch (ExecutionException e2) {
                    LOG.error("Error during close", e2);
                }
            }
        }
        LOG.debug("Shutting down queue {}", this.name);
        this.threadPool.shutdown();
        this.threadPool.awaitTermination(1L, TimeUnit.MINUTES);
        LOG.info("Closed {}", this.name);
    }

    @Override // ai.grakn.redisq.Queue
    public String getName() {
        return this.name;
    }

    @Override // ai.grakn.redisq.Queue
    public QueueConsumer<T> getConsumer() {
        return this.subscription;
    }

    @Override // ai.grakn.redisq.Queue
    public void pushAndWait(T t, long j, TimeUnit timeUnit) throws WaitException {
        Future<Void> futureForDocumentStateWait = getFutureForDocumentStateWait(ImmutableSet.of(State.DONE, State.FAILED), t.getIdAsString());
        push((Redisq<T>) t);
        try {
            futureForDocumentStateWait.get(j, timeUnit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new WaitException("Could not wait for " + t.getIdAsString() + " to be done", e);
        }
    }

    public Names getNames() {
        return this.names;
    }

    private Optional<StateInfo> getStateInfoFromRedisKey(String str) {
        try {
            Jedis jedis = (Jedis) this.jedisPool.getResource();
            Throwable th = null;
            try {
                String str2 = jedis.get(str);
                if (jedis != null) {
                    if (0 != 0) {
                        try {
                            jedis.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        jedis.close();
                    }
                }
                return str2 == null ? Optional.empty() : Optional.of(stateMapper.deserialize(str2));
            } finally {
            }
        } catch (DeserializationException e) {
            throw new RedisqException("Could not deserialize state info for " + str, e);
        }
    }
}
