package ai.grakn.redisq.consumer;

import ai.grakn.redisq.Document;
import ai.grakn.redisq.Redisq;
import ai.grakn.redisq.State;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Pool;

/* loaded from: input_file:ai/grakn/redisq/consumer/RedisqConsumer.class */
public class RedisqConsumer<T extends Document> implements QueueConsumer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RedisqConsumer.class);
    private Consumer<T> consumer;
    private Pool<Jedis> jedisPool;
    private Redisq<T> tRedisq;

    public RedisqConsumer(Consumer<T> consumer, Pool<Jedis> pool, Redisq<T> redisq) {
        this.consumer = consumer;
        this.jedisPool = pool;
        this.tRedisq = redisq;
    }

    @Override // ai.grakn.redisq.consumer.QueueConsumer
    public void process(T t) {
        try {
            this.consumer.andThen(document -> {
                updateState(document, State.DONE, "");
            }).accept(t);
        } catch (Exception e) {
            updateState(t, State.FAILED, e.getMessage());
        }
    }

    private void updateState(T t, State state, String str) {
        try {
            Jedis jedis = (Jedis) this.jedisPool.getResource();
            Throwable th = null;
            try {
                try {
                    String idAsString = t.getIdAsString();
                    this.tRedisq.setState(jedis, System.currentTimeMillis(), idAsString, state, str);
                    jedis.del(this.tRedisq.getNames().lockKeyFromId(idAsString));
                    if (jedis != null) {
                        if (0 != 0) {
                            try {
                                jedis.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            jedis.close();
                        }
                    }
                    LOG.debug("Status {} set as {}", t.getIdAsString(), state);
                } finally {
                }
            } finally {
            }
        } catch (JedisConnectionException e) {
            LOG.error("Pool is full  or terminated. Active: {}, idle: {}", Integer.valueOf(this.jedisPool.getNumActive()), Integer.valueOf(this.jedisPool.getNumIdle()));
            throw e;
        } catch (Exception e2) {
            LOG.error("Unexpected exception while updating state for {}", t, e2);
            throw e2;
        }
    }
}
