package io.activej.redis;

import io.activej.async.callback.Callback;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.Utils;
import io.activej.common.exception.CloseException;
import io.activej.common.exception.MalformedDataException;
import io.activej.eventloop.Eventloop;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/redis/RedisConnection.class */
public final class RedisConnection extends AbstractAsyncCloseable {
    private static final Logger logger = LoggerFactory.getLogger(RedisConnection.class);
    public static final boolean CHECK = Checks.isEnabled(RedisConnection.class);
    static final int INITIAL_BUFFER_SIZE = ApplicationSettings.getInt(RedisConnection.class, "initialWriteBufferSize", 16384);
    private final Eventloop eventloop;
    private final RedisClient client;
    private final AsyncTcpSocket socket;
    private int estimatedSize;
    private int requiredRemainingSize;
    private boolean readDone;
    private boolean writeDone;
    private ArrayList transactionQueue;
    private boolean flushPosted;
    private final int autoFlushIntervalMillis;
    private ByteBuf readBuf = ByteBuf.empty();

    @Nullable
    private ByteBuf writeBuf = null;
    private final ArrayDeque receiveQueue = new ArrayDeque();

    /* JADX INFO: Access modifiers changed from: package-private */
    public RedisConnection(Eventloop eventloop, RedisClient redisClient, AsyncTcpSocket asyncTcpSocket, @NotNull Duration duration) {
        this.eventloop = eventloop;
        this.client = redisClient;
        this.socket = asyncTcpSocket;
        this.autoFlushIntervalMillis = (int) duration.toMillis();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        read();
    }

    public <T> Promise<T> cmd(RedisRequest redisRequest, RedisResponse<T> redisResponse) {
        int tail;
        int write;
        if (isClosed()) {
            return Promise.ofException(new CloseException());
        }
        while (true) {
            if (this.writeBuf == null || this.writeBuf.writeRemaining() < this.requiredRemainingSize) {
                ensureBuffer();
            }
            tail = this.writeBuf.tail();
            try {
                write = redisRequest.write(this.writeBuf.array(), tail);
                this.writeBuf.tail(write);
                break;
            } catch (NeedMoreDataException | ArrayIndexOutOfBoundsException e) {
                enlargeBuffer();
            }
        }
        int i = write - tail;
        if (i > this.estimatedSize) {
            reestimate(i);
        }
        return receive(redisResponse);
    }

    public Promise<Void> multi() {
        if (CHECK) {
            Checks.checkState(!inTransaction(), "Nested MULTI call");
        }
        logger.trace("Transaction has been started");
        Promise<Void> cmd = cmd(RedisRequest.of("MULTI"), RedisResponse.OK);
        this.transactionQueue = new ArrayList();
        return cmd;
    }

    public Promise<Void> discard() {
        if (CHECK) {
            Checks.checkState(inTransaction(), "DISCARD without MULTI");
        }
        logger.trace("Transaction is being discarded");
        ArrayList arrayList = this.transactionQueue;
        this.transactionQueue = null;
        int size = arrayList.size() / 2;
        if (size != 0) {
            TransactionDiscardedException transactionDiscardedException = new TransactionDiscardedException();
            for (int i = 0; i < size; i++) {
                ((SettablePromise) arrayList.get((i * 2) + 1)).trySetException(transactionDiscardedException);
            }
        }
        return cmd(RedisRequest.of("DISCARD"), RedisResponse.OK);
    }

    public Promise<Object[]> exec() {
        if (CHECK) {
            Checks.checkState(inTransaction(), "EXEC without MULTI");
        }
        logger.trace("Executing transaction");
        final ArrayList arrayList = this.transactionQueue;
        this.transactionQueue = null;
        final int size = arrayList.size() / 2;
        return cmd(RedisRequest.of("EXEC"), new RedisResponse<Object[]>() { // from class: io.activej.redis.RedisConnection.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.activej.redis.RedisResponse
            public Object[] parse(RESPv2 rESPv2) throws MalformedDataException {
                Object[] parseResponses = parseResponses(rESPv2);
                if (parseResponses == null) {
                    return null;
                }
                for (int i = 0; i < size; i++) {
                    SettablePromise settablePromise = (SettablePromise) arrayList.get((2 * i) + 1);
                    Object obj = parseResponses[i];
                    if (obj instanceof ServerError) {
                        settablePromise.trySetException((ServerError) obj);
                    } else {
                        settablePromise.set(obj);
                    }
                }
                return parseResponses;
            }

            @Nullable
            private Object[] parseResponses(RESPv2 rESPv2) throws MalformedDataException {
                long readArraySize = rESPv2.readArraySize();
                if (readArraySize == -1) {
                    return null;
                }
                if (readArraySize != size) {
                    throw new MalformedDataException("Sent " + size + " requests in a transaction, got responses for " + readArraySize);
                }
                Object[] objArr = new Object[size];
                byte[] array = rESPv2.array();
                for (int i = 0; i < size; i++) {
                    if (!rESPv2.canRead()) {
                        throw NeedMoreDataException.NEED_MORE_DATA;
                    }
                    RedisResponse redisResponse = (RedisResponse) arrayList.get(2 * i);
                    if (array[rESPv2.head()] != 45) {
                        objArr[i] = redisResponse.parse(rESPv2);
                    } else {
                        objArr[i] = rESPv2.readObject();
                    }
                }
                return objArr;
            }
        }).then(objArr -> {
            return objArr == null ? Promise.ofException(new TransactionFailedException()) : Promise.of(objArr);
        }).whenException(th -> {
            abortTransaction(arrayList, th);
        });
    }

    public boolean inTransaction() {
        return this.transactionQueue != null;
    }

    public Promise<Void> quit() {
        if (this.transactionQueue != null) {
            QuitCalledException quitCalledException = new QuitCalledException();
            ArrayList<?> arrayList = this.transactionQueue;
            this.transactionQueue = null;
            abortTransaction(arrayList, quitCalledException);
        }
        return cmd(RedisRequest.of("QUIT"), RedisResponse.OK).then(this::sendEndOfStream).whenComplete(this::close);
    }

    private <T> Promise<T> receive(RedisResponse<T> redisResponse) {
        SettablePromise settablePromise = new SettablePromise();
        if (this.transactionQueue == null) {
            this.receiveQueue.add(redisResponse);
            this.receiveQueue.add(settablePromise);
        } else {
            this.receiveQueue.add(RedisResponse.QUEUED);
            this.receiveQueue.add((r4, th) -> {
                if (th != null) {
                    settablePromise.setException(th);
                }
            });
            this.transactionQueue.add(redisResponse);
            this.transactionQueue.add(settablePromise);
        }
        return settablePromise;
    }

    private void ensureBuffer() {
        flush();
        this.writeBuf = ByteBufPool.allocate(Math.max(INITIAL_BUFFER_SIZE, this.requiredRemainingSize));
        if (this.flushPosted) {
            return;
        }
        postFlush();
    }

    private void enlargeBuffer() {
        int writeRemaining = this.writeBuf.writeRemaining();
        flush();
        this.writeBuf = ByteBufPool.allocate(Math.max(INITIAL_BUFFER_SIZE, writeRemaining + (writeRemaining >>> 1) + 1));
    }

    private void reestimate(int i) {
        this.estimatedSize = i;
        this.requiredRemainingSize = i + (i >>> 2);
    }

    private void postFlush() {
        this.flushPosted = true;
        if (this.autoFlushIntervalMillis <= 0) {
            this.eventloop.postLast(() -> {
                this.flushPosted = false;
                flush();
            });
        } else {
            this.eventloop.delayBackground(this.autoFlushIntervalMillis, () -> {
                this.flushPosted = false;
                flush();
            });
        }
    }

    private void flush() {
        if (this.writeBuf == null) {
            return;
        }
        if (this.writeBuf.canRead()) {
            this.socket.write(this.writeBuf).whenException(th -> {
                closeEx(new RedisException("Failed to write data", th));
            });
        } else {
            this.writeBuf.recycle();
        }
        this.writeBuf = null;
    }

    private Promise<Void> sendEndOfStream() {
        return this.socket.write((ByteBuf) null).whenResult(() -> {
            this.writeDone = true;
            closeIfDone();
        }).whenException(th -> {
            closeEx(new RedisException("Failed to send end of stream", th));
        });
    }

    private void read() {
        this.socket.read().whenResult(byteBuf -> {
            if (byteBuf == null) {
                this.readDone = true;
                closeIfDone();
                return;
            }
            this.readBuf = ByteBufPool.append(this.readBuf, byteBuf);
            RESPv2 rESPv2 = new RESPv2(this.readBuf.array(), this.readBuf.head(), this.readBuf.tail());
            int head = rESPv2.head();
            while (!this.receiveQueue.isEmpty() && rESPv2.canRead()) {
                RedisResponse redisResponse = (RedisResponse) this.receiveQueue.peek();
                try {
                    if (rESPv2.peek() != 45) {
                        Object parse = redisResponse.parse(rESPv2);
                        head = rESPv2.head();
                        this.receiveQueue.poll();
                        ((Callback) this.receiveQueue.poll()).accept(parse, (Throwable) null);
                    } else {
                        ServerError serverError = (ServerError) rESPv2.readObject();
                        head = rESPv2.head();
                        this.receiveQueue.poll();
                        ((Callback) this.receiveQueue.poll()).accept((Object) null, serverError);
                    }
                } catch (MalformedDataException e) {
                    closeEx(new RedisException((Throwable) e));
                    return;
                } catch (NeedMoreDataException e2) {
                }
            }
            if (this.readBuf != null) {
                this.readBuf.head(head);
                if (!this.readBuf.canRead()) {
                    this.readBuf.recycle();
                    this.readBuf = ByteBuf.empty();
                }
            }
            read();
        }).whenException(th -> {
            closeEx(new RedisException("Failed to read data", th));
        });
    }

    private void closeIfDone() {
        if (this.readDone && this.writeDone) {
            close();
        }
    }

    protected void onClosed(@NotNull Throwable th) {
        this.socket.closeEx(th);
        this.writeBuf = (ByteBuf) Utils.nullify(this.writeBuf, (v0) -> {
            v0.recycle();
        });
        this.readBuf = (ByteBuf) Utils.nullify(this.readBuf, (v0) -> {
            v0.recycle();
        });
        while (!this.receiveQueue.isEmpty()) {
            this.receiveQueue.poll();
            ((Callback) this.receiveQueue.poll()).accept((Object) null, th);
        }
        this.transactionQueue = (ArrayList) Utils.nullify(this.transactionQueue, arrayList -> {
            abortTransaction(arrayList, th);
        });
    }

    private void abortTransaction(ArrayList<?> arrayList, Throwable th) {
        for (int i = 0; i < arrayList.size() / 2; i++) {
            ((SettablePromise) arrayList.get((2 * i) + 1)).trySetException(th);
        }
    }

    public String toString() {
        return "RedisConnection{client=" + this.client + ", receiveQueue=" + (this.receiveQueue.size() / 2) + (this.transactionQueue != null ? ", transactionQueue=" + (this.transactionQueue.size() / 2) : "") + ", closed=" + isClosed() + '}';
    }
}
