package party.iroiro.r2jdbc;

import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.IsolationLevel;
import java.lang.reflect.InvocationTargetException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.Function;
import lbmq.LinkedBlockingMultiQueue;
import org.apache.commons.beanutils.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import party.iroiro.r2jdbc.JdbcConnectionFactoryProvider;
import party.iroiro.r2jdbc.JdbcJob;
import party.iroiro.r2jdbc.JdbcResult;
import party.iroiro.r2jdbc.codecs.Codec;
import party.iroiro.r2jdbc.codecs.DefaultCodec;
import party.iroiro.r2jdbc.util.Pair;
import party.iroiro.r2jdbc.util.QueueItem;
import party.iroiro.r2jdbc.util.SingletonMono;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:party/iroiro/r2jdbc/JdbcWorker.class */
public class JdbcWorker implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(JdbcWorker.class);
    private final BlockingQueue<JdbcJob> jobs;
    private final LinkedBlockingMultiQueue<Integer, QueueItem<JdbcPacket>>.SubQueue out;
    private final ConnectionFactoryOptions options;
    private final SingletonMono<JdbcConnectionMetadata> metadata;
    private final List<JdbcJob> closeJobs;
    private State state;
    private Codec codec;
    private final List<Connection> connections = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:party/iroiro/r2jdbc/JdbcWorker$State.class */
    public enum State {
        STARTING,
        CLOSING,
        ENDED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JdbcWorker(BlockingQueue<JdbcJob> blockingQueue, LinkedBlockingMultiQueue<Integer, QueueItem<JdbcPacket>>.SubQueue subQueue, ConnectionFactoryOptions connectionFactoryOptions) {
        this.jobs = blockingQueue;
        this.out = subQueue;
        this.options = connectionFactoryOptions;
        Thread thread = new Thread(this);
        this.metadata = new SingletonMono<>();
        this.codec = null;
        this.closeJobs = new LinkedList();
        this.state = State.STARTING;
        thread.start();
    }

    private static Connection getConnection(ConnectionFactoryOptions connectionFactoryOptions) throws SQLException {
        JdbcConnectionFactoryProvider.JdbcConnectionDetails jdbcConnectionUrl = JdbcConnectionFactoryProvider.getJdbcConnectionUrl(connectionFactoryOptions);
        return DriverManager.getConnection(jdbcConnectionUrl.getUrl(), jdbcConnectionUrl.getProperties());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Mono<Void> voidSend(JdbcWorker jdbcWorker, @Nullable Connection connection, JdbcJob.Job job, @Nullable Object obj) {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                if (offerNow(jdbcWorker, connection, job, obj, (jdbcPacket, th) -> {
                    if (th == null) {
                        monoSink.success();
                    } else {
                        monoSink.error(new JdbcException(th));
                    }
                })) {
                    return;
                }
                monoSink.error(new JdbcException(new IndexOutOfBoundsException("Unable to push to queue")));
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean offerNow(JdbcWorker jdbcWorker, @Nullable Connection connection, JdbcJob.Job job, @Nullable Object obj, BiConsumer<JdbcPacket, Throwable> biConsumer) {
        if (jdbcWorker.notEnded()) {
            return jdbcWorker.getJobQueue().offer(new JdbcJob(connection, job, obj, biConsumer));
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> Mono<T> send(JdbcWorker jdbcWorker, @Nullable Connection connection, JdbcJob.Job job, @Nullable Object obj, Function<JdbcPacket, T> function) {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                if (offerNow(jdbcWorker, connection, job, obj, (jdbcPacket, th) -> {
                    if (th == null) {
                        monoSink.success(function.apply(jdbcPacket));
                    } else {
                        monoSink.error(new JdbcException(th));
                    }
                })) {
                    return;
                }
                monoSink.error(new JdbcException(new IndexOutOfBoundsException("Unable to push to queue")));
            });
        });
    }

    public synchronized boolean notEnded() {
        return this.state != State.ENDED;
    }

    private Codec initCodec() throws ClassNotFoundException, ClassCastException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException {
        if (!this.options.hasOption(JdbcConnectionFactoryProvider.CODEC)) {
            return new DefaultCodec();
        }
        Class<?> cls = Class.forName((String) this.options.getValue(JdbcConnectionFactoryProvider.CODEC));
        if (Codec.class.isAssignableFrom(cls)) {
            return (Codec) ConstructorUtils.invokeConstructor(cls, (Object[]) null);
        }
        throw new ClassCastException(cls.getName());
    }

    private void offer(JdbcPacket jdbcPacket, BiConsumer<JdbcPacket, Throwable> biConsumer) {
        this.out.offer(new QueueItem(jdbcPacket, null, biConsumer, true));
    }

    private void offer(BiConsumer<JdbcPacket, Throwable> biConsumer) {
        this.out.offer(new QueueItem(null, null, biConsumer, false));
    }

    private void offer(Exception exc, BiConsumer<JdbcPacket, Throwable> biConsumer) {
        this.out.offer(new QueueItem(null, exc, biConsumer, true));
    }

    private void takeAndProcess() throws InterruptedException {
        JdbcJob take = this.jobs.take();
        try {
            process(take);
        } catch (InterruptedException e) {
            throw e;
        } catch (Throwable th) {
            log.error("Unexpected exception", th);
            offer((Exception) new JdbcException(th), take.consumer);
        }
    }

    private void process(JdbcJob jdbcJob) throws InterruptedException {
        log.trace("Processing: {}", jdbcJob.job);
        Connection connection = jdbcJob.connection;
        switch (jdbcJob.job) {
            case INIT_CONNECTION:
                try {
                    Connection connection2 = getConnection(this.options);
                    this.connections.add(connection2);
                    DatabaseMetaData metaData = connection2.getMetaData();
                    offer(new JdbcPacket(new JdbcConnectionMetadata(connection2, metaData.getDatabaseProductName(), metaData.getDatabaseProductVersion())), jdbcJob.consumer);
                    break;
                } catch (SQLException e) {
                    offer(e, jdbcJob.consumer);
                    break;
                }
            case CLOSE_CONNECTION:
                try {
                    commitAndClose(connection);
                    this.connections.remove(connection);
                    offer(jdbcJob.consumer);
                    break;
                } catch (SQLException e2) {
                    offer(e2, jdbcJob.consumer);
                    break;
                }
            case GET_AUTO_COMMIT:
                try {
                    offer(new JdbcPacket(Boolean.valueOf(connection.getAutoCommit())), jdbcJob.consumer);
                    break;
                } catch (SQLException e3) {
                    offer(e3, jdbcJob.consumer);
                    break;
                }
            case SET_AUTO_COMMIT:
                if (jdbcJob.data instanceof Boolean) {
                    try {
                        connection.setAutoCommit(((Boolean) jdbcJob.data).booleanValue());
                        offer(jdbcJob.consumer);
                        break;
                    } catch (SQLException e4) {
                        offer(e4, jdbcJob.consumer);
                        break;
                    }
                } else {
                    offer(new IllegalArgumentException("Expected Boolean data"), jdbcJob.consumer);
                    break;
                }
            case START_TRANSACTION:
                try {
                    connection.setAutoCommit(false);
                    offer(jdbcJob.consumer);
                    break;
                } catch (SQLException e5) {
                    offer(e5, jdbcJob.consumer);
                    break;
                }
            case END_TRANSACTION:
                try {
                    connection.commit();
                    offer(jdbcJob.consumer);
                    break;
                } catch (SQLException e6) {
                    offer(e6, jdbcJob.consumer);
                    break;
                }
            case ROLLBACK_TRANSACTION:
                try {
                    connection.rollback();
                    offer(jdbcJob.consumer);
                    break;
                } catch (SQLException e7) {
                    offer(e7, jdbcJob.consumer);
                    break;
                }
            case SET_ISOLATION_LEVEL:
                try {
                    if (IsolationLevel.REPEATABLE_READ.equals(jdbcJob.data)) {
                        connection.setTransactionIsolation(4);
                    } else if (IsolationLevel.SERIALIZABLE.equals(jdbcJob.data)) {
                        connection.setTransactionIsolation(8);
                    } else if (IsolationLevel.READ_UNCOMMITTED.equals(jdbcJob.data)) {
                        connection.setTransactionIsolation(1);
                    } else if (!IsolationLevel.READ_COMMITTED.equals(jdbcJob.data)) {
                        offer(new IllegalArgumentException("Unrecognized isolation level"), jdbcJob.consumer);
                        break;
                    } else {
                        connection.setTransactionIsolation(2);
                    }
                    offer(jdbcJob.consumer);
                    break;
                } catch (SQLException e8) {
                    offer(e8, jdbcJob.consumer);
                    break;
                }
            case VALIDATE:
                try {
                    offer(new JdbcPacket(Boolean.valueOf(connection.isValid(0))), jdbcJob.consumer);
                    break;
                } catch (SQLException e9) {
                    offer(e9, jdbcJob.consumer);
                    break;
                }
            case EXECUTE_STATEMENT:
                JdbcStatement jdbcStatement = (JdbcStatement) jdbcJob.data;
                if (jdbcStatement.getSize() == 0) {
                    offer(new IllegalArgumentException("Fetch size is either -1 or positive"), jdbcJob.consumer);
                    break;
                } else {
                    try {
                        offer(new JdbcPacket(execute(connection, jdbcStatement.sql, jdbcStatement.bindings, jdbcStatement.wantsGenerated.get())), jdbcJob.consumer);
                        break;
                    } catch (IllegalArgumentException | SQLException e10) {
                        offer(e10, jdbcJob.consumer);
                        break;
                    }
                }
            case BATCH:
                JdbcBatch jdbcBatch = (JdbcBatch) jdbcJob.data;
                ArrayList arrayList = new ArrayList(jdbcBatch.sql.size());
                Iterator<String> it = jdbcBatch.sql.iterator();
                while (it.hasNext()) {
                    try {
                        arrayList.add(execute(connection, it.next(), null, null));
                    } catch (SQLException e11) {
                        arrayList.add(e11);
                    }
                }
                offer(new JdbcPacket(arrayList), jdbcJob.consumer);
                break;
            case RESULT_METADATA:
                ResultSet resultSet = (ResultSet) jdbcJob.data;
                if (resultSet == null) {
                    offer((Exception) new JdbcException(new IllegalArgumentException("Null ResultSet")), jdbcJob.consumer);
                    break;
                } else {
                    try {
                        ResultSetMetaData metaData2 = resultSet.getMetaData();
                        int columnCount = metaData2.getColumnCount();
                        ArrayList arrayList2 = new ArrayList(columnCount);
                        for (int i = 0; i < columnCount; i++) {
                            arrayList2.add(new JdbcColumnMetadata(metaData2, this.codec, i + 1));
                        }
                        offer(new JdbcPacket(new JdbcRowMetadata(arrayList2)), jdbcJob.consumer);
                        break;
                    } catch (SQLException e12) {
                        offer(e12, jdbcJob.consumer);
                        break;
                    }
                }
            case RESULT_ROWS:
                JdbcResult.JdbcResultRequest jdbcResultRequest = (JdbcResult.JdbcResultRequest) jdbcJob.data;
                if (jdbcResultRequest.result == null) {
                    offer((Exception) new JdbcException(new IllegalArgumentException("Null ResultSet")), jdbcJob.consumer);
                    break;
                } else {
                    ArrayList arrayList3 = new ArrayList(jdbcResultRequest.count + 1);
                    try {
                        ResultSet resultSet2 = jdbcResultRequest.result;
                        resultSet2.setFetchSize(Math.max(jdbcResultRequest.count, 0));
                        int i2 = jdbcResultRequest.count > 0 ? jdbcResultRequest.count : Integer.MAX_VALUE;
                        List<? extends ColumnMetadata> columnMetadatas = jdbcResultRequest.columns.getColumnMetadatas();
                        int size = columnMetadatas.size();
                        int i3 = 0;
                        while (true) {
                            if (i3 < i2) {
                                if (resultSet2.next()) {
                                    ArrayList arrayList4 = new ArrayList(size);
                                    for (int i4 = 0; i4 < size; i4++) {
                                        ColumnMetadata columnMetadata = columnMetadatas.get(i4);
                                        Class cls = (Class) columnMetadata.getNativeTypeMetadata();
                                        if (cls == null) {
                                            arrayList4.add(this.codec.decode(resultSet2.getObject(i4 + 1), columnMetadata.getJavaType()));
                                        } else {
                                            arrayList4.add(this.codec.decode(resultSet2.getObject(i4 + 1, cls), columnMetadata.getJavaType()));
                                        }
                                    }
                                    arrayList3.add(new JdbcRow(arrayList4));
                                    i3++;
                                } else {
                                    arrayList3.add(null);
                                }
                            }
                        }
                        offer(new JdbcPacket(arrayList3), jdbcJob.consumer);
                        break;
                    } catch (SQLException e13) {
                        offer(e13, jdbcJob.consumer);
                        break;
                    }
                }
            case CLOSE_RESULT:
                try {
                    ((ResultSet) jdbcJob.data).close();
                    offer(jdbcJob.consumer);
                    break;
                } catch (SQLException e14) {
                    offer(e14, jdbcJob.consumer);
                    break;
                }
            case CLOSE:
                synchronized (this) {
                    this.state = State.ENDED;
                }
                this.closeJobs.add(jdbcJob);
                throw new InterruptedException("Connection closing");
        }
        log.trace("Process finished: {}", jdbcJob.job);
    }

    private Object execute(Connection connection, String str, @Nullable ArrayList<Map<Integer, Object>> arrayList, @Nullable String[] strArr) throws SQLException {
        PreparedStatement cachedOrPrepare = getCachedOrPrepare(connection, str, strArr);
        ArrayList<Object> arrayList2 = new ArrayList<>(arrayList == null ? 1 : arrayList.size());
        if (arrayList == null) {
            executeSingle(connection, arrayList2, cachedOrPrepare, null);
        } else {
            if (arrayList.size() == 0) {
                cachedOrPrepare.close();
                throw new IllegalArgumentException("No valid statement");
            }
            Iterator<Map<Integer, Object>> it = arrayList.iterator();
            while (it.hasNext()) {
                executeSingle(connection, arrayList2, cachedOrPrepare, it.next());
            }
        }
        return arrayList2;
    }

    private void executeSingle(Connection connection, ArrayList<Object> arrayList, PreparedStatement preparedStatement, @Nullable Map<Integer, Object> map) throws SQLException {
        if (map != null) {
            bindStatement(connection, preparedStatement, map);
        }
        if (!preparedStatement.execute()) {
            Pair pair = new Pair(new int[]{preparedStatement.getUpdateCount()}, preparedStatement.getGeneratedKeys());
            preparedStatement.closeOnCompletion();
            arrayList.add(pair);
            return;
        }
        do {
            arrayList.add(preparedStatement.getResultSet());
        } while (preparedStatement.getMoreResults(2));
        preparedStatement.closeOnCompletion();
    }

    private PreparedStatement getCachedOrPrepare(Connection connection, String str, @Nullable String[] strArr) throws SQLException {
        return strArr == null ? connection.prepareStatement(str) : strArr.length == 0 ? connection.prepareStatement(str, 1) : connection.prepareStatement(str, strArr);
    }

    private void bindStatement(Connection connection, PreparedStatement preparedStatement, Map<Integer, Object> map) throws SQLException {
        preparedStatement.clearParameters();
        for (int i = 0; i < map.size(); i++) {
            preparedStatement.setObject(i + 1, this.codec.encode(connection, map.getOrDefault(Integer.valueOf(i), null)));
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread currentThread = Thread.currentThread();
        currentThread.setName("R2jdbcWorker-" + currentThread.getId());
        try {
            this.codec = initCodec();
            log.debug("Listening");
            while (!Thread.interrupted()) {
                try {
                    takeAndProcess();
                } catch (InterruptedException e) {
                }
            }
            log.debug("Cleaning up");
            while (this.jobs.peek() != null) {
                try {
                    takeAndProcess();
                } catch (InterruptedException e2) {
                }
            }
            closeAllConnections();
            this.closeJobs.forEach(jdbcJob -> {
                offer(jdbcJob.consumer);
            });
            this.closeJobs.clear();
            this.metadata.set(null);
            log.debug("Exiting");
        } catch (ClassCastException | ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e3) {
            synchronized (this) {
                this.state = State.ENDED;
                log.error("Failed to instantiate Codec", e3);
            }
        }
    }

    private void commitAndClose(Connection connection) throws SQLException {
        connection.commit();
        connection.close();
    }

    private void closeAllConnections() {
        this.connections.removeIf(connection -> {
            try {
                commitAndClose(connection);
                return true;
            } catch (SQLException e) {
                log.error("Error closing database", e);
                return false;
            }
        });
    }

    public Mono<Void> close(Connection connection) {
        return voidSend(this, connection, JdbcJob.Job.CLOSE_CONNECTION, null);
    }

    public Mono<Void> closeNow() {
        return Mono.defer(() -> {
            this.state = State.CLOSING;
            return voidSend(this, null, JdbcJob.Job.CLOSE, null);
        });
    }

    public BlockingQueue<JdbcJob> getJobQueue() {
        return this.jobs;
    }

    public synchronized boolean isAlive() {
        return this.state == State.STARTING;
    }

    public Mono<JdbcConnectionMetadata> newConnection() {
        if (this.state != State.STARTING) {
            return Mono.error(new IllegalStateException("Thread ended"));
        }
        Mono send = send(this, null, JdbcJob.Job.INIT_CONNECTION, null, jdbcPacket -> {
            return (JdbcConnectionMetadata) jdbcPacket.data;
        });
        SingletonMono<JdbcConnectionMetadata> singletonMono = this.metadata;
        Objects.requireNonNull(singletonMono);
        return send.doOnNext((v1) -> {
            r1.set(v1);
        });
    }
}
