package dev.miku.r2dbc.mysql.client;

import dev.miku.r2dbc.mysql.MySqlSslConfiguration;
import dev.miku.r2dbc.mysql.message.client.ClientMessage;
import dev.miku.r2dbc.mysql.message.client.ExchangeableMessage;
import dev.miku.r2dbc.mysql.message.client.ExitMessage;
import dev.miku.r2dbc.mysql.message.client.SendOnlyMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.WarningMessage;
import dev.miku.r2dbc.mysql.util.AssertUtils;
import dev.miku.r2dbc.mysql.util.ConnectionContext;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.FutureMono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/miku/r2dbc/mysql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class);
    private static final Consumer<ServerMessage> INFO_LOGGING = ReactorNettyClient::infoLogging;
    private static final Consumer<ServerMessage> DEBUG_LOGGING = serverMessage -> {
        logger.debug("Response: {}", serverMessage);
        infoLogging(serverMessage);
    };
    private static final BiConsumer<Object, SynchronousSink<ServerMessage>> INBOUND_HANDLE = ReactorNettyClient::inboundHandle;
    private final Connection connection;
    private final ConnectionContext context;
    private final EmitterProcessor<ServerMessage> responseProcessor = EmitterProcessor.create(false);
    private final RequestQueue requestQueue = new RequestQueue();
    private final AtomicBoolean closing = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/miku/r2dbc/mysql/client/ReactorNettyClient$Identity.class */
    public static final class Identity implements Function<Object, Object> {
        private static final Identity INSTANCE = new Identity();

        private Identity() {
        }

        @Override // java.util.function.Function
        public Object apply(Object obj) {
            return obj;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReactorNettyClient(Connection connection, MySqlSslConfiguration mySqlSslConfiguration, ConnectionContext connectionContext) {
        AssertUtils.requireNonNull(connection, "connection must not be null");
        AssertUtils.requireNonNull(connectionContext, "context must not be null");
        AssertUtils.requireNonNull(mySqlSslConfiguration, "ssl must not be null");
        this.connection = connection;
        this.context = connectionContext;
        connection.addHandlerLast("R2dbcMySqlEnvelopeSlicer", new EnvelopeSlicer()).addHandlerLast("R2dbcMySqlMessageDuplexCodec", new MessageDuplexCodec(connectionContext, this.closing, this.requestQueue));
        if (mySqlSslConfiguration.getSslMode().startSsl()) {
            connection.addHandlerFirst("R2dbcMySqlSslBridgeHandler", new SslBridgeHandler(connectionContext, mySqlSslConfiguration));
        }
        if (InternalLoggerFactory.getInstance(ReactorNettyClient.class).isTraceEnabled()) {
            logger.debug("Connection tracking logging is enabled");
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
        }
        Flux handle = connection.inbound().receiveObject().handle(INBOUND_HANDLE);
        if (logger.isDebugEnabled()) {
            handle = handle.doOnNext(DEBUG_LOGGING);
        } else if (logger.isInfoEnabled()) {
            handle = handle.doOnNext(INFO_LOGGING);
        }
        EmitterProcessor<ServerMessage> emitterProcessor = this.responseProcessor;
        emitterProcessor.getClass();
        Consumer consumer = (v1) -> {
            r1.onNext(v1);
        };
        Consumer consumer2 = th -> {
            try {
                logger.error("Connection Error: {}", th.getMessage(), th);
                this.responseProcessor.onError(th);
            } finally {
                connection.dispose();
            }
        };
        EmitterProcessor<ServerMessage> emitterProcessor2 = this.responseProcessor;
        emitterProcessor2.getClass();
        handle.subscribe(consumer, consumer2, emitterProcessor2::onComplete);
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public Flux<ServerMessage> exchange(ExchangeableMessage exchangeableMessage, Predicate<ServerMessage> predicate) {
        AssertUtils.requireNonNull(exchangeableMessage, "request must not be null");
        return Mono.create(monoSink -> {
            if (isConnected()) {
                this.requestQueue.submit(RequestTask.wrap(exchangeableMessage, monoSink, () -> {
                    boolean[] zArr = {false};
                    return send(exchangeableMessage).thenMany(this.responseProcessor).handle((serverMessage, synchronousSink) -> {
                        if (!predicate.test(serverMessage)) {
                            synchronousSink.next(serverMessage);
                            return;
                        }
                        zArr[0] = true;
                        synchronousSink.next(serverMessage);
                        synchronousSink.complete();
                    }).doOnTerminate(this.requestQueue).doOnCancel(() -> {
                        exchangeCancel(zArr);
                    });
                }));
                return;
            }
            if (exchangeableMessage instanceof Disposable) {
                ((Disposable) exchangeableMessage).dispose();
            }
            monoSink.error(new IllegalStateException("Cannot send messages because the connection is closed"));
        }).flatMapMany(identity());
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public Mono<Void> sendOnly(SendOnlyMessage sendOnlyMessage) {
        AssertUtils.requireNonNull(sendOnlyMessage, "message must not be null");
        return Mono.create(monoSink -> {
            if (isConnected()) {
                this.requestQueue.submit(RequestTask.wrap(sendOnlyMessage, monoSink, () -> {
                    return send(sendOnlyMessage).doOnTerminate(this.requestQueue);
                }));
                return;
            }
            if (sendOnlyMessage instanceof Disposable) {
                ((Disposable) sendOnlyMessage).dispose();
            }
            monoSink.error(new IllegalStateException("Cannot send messages because the connection is closed"));
        }).flatMap(identity());
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public Mono<ServerMessage> receiveOnly() {
        return Mono.create(monoSink -> {
            if (isConnected()) {
                this.requestQueue.submit(RequestTask.wrap(monoSink, () -> {
                    boolean[] zArr = {false};
                    return this.responseProcessor.next().doOnSuccess(serverMessage -> {
                        zArr[0] = true;
                    }).doOnTerminate(this.requestQueue).doOnCancel(() -> {
                        exchangeCancel(zArr);
                    });
                }));
            } else {
                monoSink.error(new IllegalStateException("Cannot receive messages because the connection is closed"));
            }
        }).flatMap(identity());
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public Mono<Void> close() {
        return Mono.create(monoSink -> {
            if (this.closing.compareAndSet(false, true)) {
                this.requestQueue.submit(RequestTask.wrap(monoSink, () -> {
                    return send(ExitMessage.getInstance()).onErrorResume(th -> {
                        logger.error("Exit message sending failed, force closing", th);
                        return Mono.empty();
                    }).then(forceClose());
                }));
            } else {
                monoSink.success();
            }
        }).flatMap(identity());
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public Mono<Void> forceClose() {
        return FutureMono.deferFuture(() -> {
            return this.connection.channel().close();
        });
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public boolean isConnected() {
        return !this.closing.get() && this.connection.channel().isOpen();
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public void sslUnsupported() {
        this.connection.channel().pipeline().fireUserEventTriggered(SslState.UNSUPPORTED);
    }

    @Override // dev.miku.r2dbc.mysql.client.Client
    public void loginSuccess() {
        this.connection.channel().pipeline().fireUserEventTriggered(Lifecycle.COMMAND);
    }

    public String toString() {
        Object[] objArr = new Object[2];
        objArr[0] = this.closing.get() ? "closing or closed" : "activating";
        objArr[1] = Integer.valueOf(this.context.getConnectionId());
        return String.format("ReactorNettyClient(%s){connectionId=%d}", objArr);
    }

    private Mono<Void> send(ClientMessage clientMessage) {
        logger.debug("Request: {}", clientMessage);
        return FutureMono.from(this.connection.channel().writeAndFlush(clientMessage));
    }

    private static void inboundHandle(Object obj, SynchronousSink<ServerMessage> synchronousSink) {
        if (!(obj instanceof ServerMessage)) {
            synchronousSink.error(new IllegalStateException("Impossible inbound type: " + obj.getClass()));
            return;
        }
        if (obj instanceof ReferenceCounted) {
            ((ReferenceCounted) obj).retain();
        }
        synchronousSink.next((ServerMessage) obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void exchangeCancel(boolean[] zArr) {
        if (zArr[0]) {
            return;
        }
        logger.error("Exchange cancelled while exchange is active. This is likely a bug leading to unpredictable outcome.");
    }

    private static void infoLogging(ServerMessage serverMessage) {
        int warnings;
        if (!(serverMessage instanceof WarningMessage) || (warnings = ((WarningMessage) serverMessage).getWarnings()) == 0) {
            return;
        }
        logger.info("MySQL reports {} warning(s)", Integer.valueOf(warnings));
    }

    private static <T> Function<T, T> identity() {
        return Identity.INSTANCE;
    }
}
