package io.r2dbc.mssql.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.mssql.client.ssl.TdsSslHandler;
import io.r2dbc.mssql.message.ClientMessage;
import io.r2dbc.mssql.message.Message;
import io.r2dbc.mssql.message.TransactionDescriptor;
import io.r2dbc.mssql.message.header.PacketIdProvider;
import io.r2dbc.mssql.message.tds.ProtocolException;
import io.r2dbc.mssql.message.token.AbstractInfoToken;
import io.r2dbc.mssql.message.token.EnvChangeToken;
import io.r2dbc.mssql.message.token.ErrorToken;
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
import io.r2dbc.mssql.message.token.InfoToken;
import io.r2dbc.mssql.message.type.Collation;
import io.r2dbc.mssql.util.Assert;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;

/* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class);
    private final AtomicReference<ByteBufAllocator> byteBufAllocator = new AtomicReference<>();
    private final AtomicReference<Connection> connection = new AtomicReference<>();
    private final AtomicReference<TransactionDescriptor> transactionDescriptor = new AtomicReference<>(TransactionDescriptor.empty());
    private final AtomicReference<TransactionStatus> transactionStatus = new AtomicReference<>(TransactionStatus.AUTO_COMMIT);
    private final AtomicReference<Optional<Collation>> databaseCollation = new AtomicReference<>(Optional.empty());
    private final AtomicBoolean encryptionSupported = new AtomicBoolean();
    private final List<EnvironmentChangeListener> envChangeListeners = new ArrayList();
    private final Consumer<AbstractInfoToken> infoTokenConsumer = abstractInfoToken -> {
        if (logger.isDebugEnabled()) {
            if (abstractInfoToken.getClassification() == AbstractInfoToken.Classification.INFORMATIONAL) {
                logger.debug("Info: Code [{}] Severity [{}]: {}", new Object[]{Long.valueOf(abstractInfoToken.getNumber()), abstractInfoToken.getClassification(), abstractInfoToken.getMessage()});
            } else {
                logger.debug("Warning: Code [{}] Severity [{}]: {}", new Object[]{Long.valueOf(abstractInfoToken.getNumber()), abstractInfoToken.getClassification(), abstractInfoToken.getMessage()});
            }
        }
    };
    private final Consumer<Message> handleInfoToken;
    private final Consumer<Message> handleErrorToken;
    private final Consumer<Message> handleEnvChange;
    private final Consumer<Message> featureAckChange;
    private final BiConsumer<Message, SynchronousSink<Message>> handleStateChange;
    private final AtomicBoolean isClosed;
    private final EmitterProcessor<ClientMessage> requestProcessor;
    private final FluxSink<ClientMessage> requests;
    private final EmitterProcessor<Message> responseProcessor;
    private final AtomicReference<ConnectionState> state;
    private final AtomicReference<MessageDecoder> decodeFunction;
    private final TdsEncoder tdsEncoder;

    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$CollationListener.class */
    class CollationListener implements EnvironmentChangeListener {
        CollationListener() {
        }

        @Override // io.r2dbc.mssql.client.EnvironmentChangeListener
        public void onEnvironmentChange(EnvironmentChangeEvent environmentChangeEvent) {
            if (environmentChangeEvent.getToken().getChangeType() == EnvChangeToken.EnvChangeType.SQLCollation) {
                ReactorNettyClient.this.databaseCollation.set(Optional.of(Collation.decode(Unpooled.wrappedBuffer(environmentChangeEvent.getToken().getNewValue()))));
            }
        }
    }

    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$TransactionListener.class */
    class TransactionListener implements EnvironmentChangeListener {
        TransactionListener() {
        }

        @Override // io.r2dbc.mssql.client.EnvironmentChangeListener
        public void onEnvironmentChange(EnvironmentChangeEvent environmentChangeEvent) {
            EnvChangeToken token = environmentChangeEvent.getToken();
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx || token.getChangeType() == EnvChangeToken.EnvChangeType.EnlistDTC) {
                byte[] newValue = token.getNewValue();
                if (newValue.length != 8) {
                    throw ProtocolException.invalidTds("Transaction descriptor length mismatch");
                }
                if (ReactorNettyClient.logger.isDebugEnabled()) {
                    ReactorNettyClient.logger.debug(String.format("Transaction %s", token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx ? "started" : "enlisted"));
                }
                updateStatus(TransactionStatus.STARTED, TransactionDescriptor.from(newValue));
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.CommitTx) {
                if (ReactorNettyClient.logger.isDebugEnabled()) {
                    ReactorNettyClient.logger.debug("Transaction committed");
                }
                updateStatus(TransactionStatus.EXPLICIT, TransactionDescriptor.empty());
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.RollbackTx) {
                if (ReactorNettyClient.logger.isDebugEnabled()) {
                    ReactorNettyClient.logger.debug("Transaction rolled back");
                }
                updateStatus(TransactionStatus.EXPLICIT, TransactionDescriptor.empty());
            }
        }

        private void updateStatus(TransactionStatus transactionStatus, TransactionDescriptor transactionDescriptor) {
            ReactorNettyClient.this.transactionStatus.set(transactionStatus);
            ReactorNettyClient.this.transactionDescriptor.set(transactionDescriptor);
        }
    }

    private ReactorNettyClient(Connection connection, TdsEncoder tdsEncoder) {
        Consumer<AbstractInfoToken> consumer = this.infoTokenConsumer;
        consumer.getClass();
        this.handleInfoToken = handleExact(InfoToken.class, (v1) -> {
            r2.accept(v1);
        });
        Consumer<AbstractInfoToken> consumer2 = this.infoTokenConsumer;
        consumer2.getClass();
        this.handleErrorToken = handleExact(ErrorToken.class, (v1) -> {
            r2.accept(v1);
        });
        this.handleEnvChange = handleExact(EnvChangeToken.class, envChangeToken -> {
            EnvironmentChangeEvent environmentChangeEvent = new EnvironmentChangeEvent(envChangeToken);
            for (EnvironmentChangeListener environmentChangeListener : this.envChangeListeners) {
                try {
                    environmentChangeListener.onEnvironmentChange(environmentChangeEvent);
                } catch (Exception e) {
                    logger.warn("Failed onEnvironmentChange() in {}", environmentChangeListener, e);
                }
            }
        });
        this.featureAckChange = handleExact(FeatureExtAckToken.class, featureExtAckToken -> {
            Iterator<FeatureExtAckToken.FeatureToken> it = featureExtAckToken.getFeatureTokens().iterator();
            while (it.hasNext()) {
                if (it.next() instanceof FeatureExtAckToken.ColumnEncryption) {
                    this.encryptionSupported.set(true);
                }
            }
        });
        this.handleStateChange = handleMessage(Message.class, (message, synchronousSink) -> {
            ConnectionState connectionState = this.state.get();
            if (connectionState.canAdvance(message)) {
                ConnectionState next = connectionState.next(message, this.connection.get());
                if (this.state.compareAndSet(connectionState, next)) {
                    this.decodeFunction.set(next.decoder(this));
                } else {
                    synchronousSink.error(ProtocolException.invalidTds(String.format("Cannot advance state from [%s]", connectionState)));
                }
            }
            synchronousSink.next(message);
        });
        this.isClosed = new AtomicBoolean(false);
        this.requestProcessor = EmitterProcessor.create(false);
        this.requests = this.requestProcessor.sink();
        this.responseProcessor = EmitterProcessor.create(false);
        this.state = new AtomicReference<>(ConnectionState.PRELOGIN);
        this.decodeFunction = new AtomicReference<>(ConnectionState.PRELOGIN.decoder(this));
        Assert.requireNonNull(connection, "Connection must not be null");
        FluxSink sink = this.responseProcessor.sink();
        StreamDecoder streamDecoder = new StreamDecoder();
        this.byteBufAllocator.set(connection.outbound().alloc());
        this.connection.set(connection);
        this.envChangeListeners.add(tdsEncoder);
        this.envChangeListeners.add(new TransactionListener());
        this.envChangeListeners.add(new CollationListener());
        this.tdsEncoder = tdsEncoder;
        connection.addHandlerFirst(new ChannelInboundHandlerAdapter() { // from class: io.r2dbc.mssql.client.ReactorNettyClient.1
            public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                if (ReactorNettyClient.this.isClosed.compareAndSet(false, true)) {
                    ReactorNettyClient.logger.warn("Connection has been closed by peer");
                }
                super.channelInactive(channelHandlerContext);
            }
        });
        Flux doOnError = ((Flux) ((Flux) connection.inbound().receiveObject().concatMap(obj -> {
            return obj instanceof ByteBuf ? streamDecoder.decode((ByteBuf) obj, this.decodeFunction.get()) : obj instanceof Message ? Mono.just((Message) obj) : Mono.error(ProtocolException.unsupported(String.format("Unexpected protocol message: [%s]", obj)));
        }).as(flux -> {
            return logger.isDebugEnabled() ? flux.doOnNext(message2 -> {
                logger.debug("Response: {}", message2);
            }) : flux;
        })).as(flux2 -> {
            return logger.isDebugEnabled() ? flux2.doOnNext(this.handleInfoToken).doOnNext(this.handleErrorToken) : flux2;
        })).doOnError(th -> {
            logger.warn("Error: {}", th.getMessage(), th);
        }).handle(this.handleStateChange).doOnNext(message2 -> {
            this.handleEnvChange.accept(message2);
            this.featureAckChange.accept(message2);
        }).doOnError(ProtocolException.class, protocolException -> {
            logger.warn("Error: {}", protocolException.getMessage(), protocolException);
            this.isClosed.set(true);
            connection.channel().close();
        });
        sink.getClass();
        Consumer consumer3 = (v1) -> {
            r1.next(v1);
        };
        sink.getClass();
        Consumer consumer4 = sink::error;
        sink.getClass();
        doOnError.subscribe(consumer3, consumer4, sink::complete);
        ((Flux) this.requestProcessor.as(flux3 -> {
            return logger.isDebugEnabled() ? flux3.doOnNext(clientMessage -> {
                logger.debug("Request: {}", clientMessage);
            }) : flux3;
        })).concatMap(clientMessage -> {
            return connection.outbound().sendObject(clientMessage.encode(connection.outbound().alloc(), this.tdsEncoder.getPacketSize()));
        }).doOnError(th2 -> {
            logger.warn("Error: {}", th2.getMessage(), th2);
            this.isClosed.set(true);
            connection.channel().close();
        }).subscribe();
    }

    public static Mono<ReactorNettyClient> connect(String str, int i) {
        Assert.requireNonNull(str, "host must not be null");
        return connect(str, i, Duration.ofSeconds(30L));
    }

    public static Mono<ReactorNettyClient> connect(String str, int i, Duration duration) {
        Assert.requireNonNull(duration, "connect timeout must not be null");
        Assert.requireNonNull(str, "host must not be null");
        return connect(str, i, duration, ConnectionProvider.newConnection());
    }

    private static Mono<ReactorNettyClient> connect(String str, int i, Duration duration, ConnectionProvider connectionProvider) {
        Assert.requireNonNull(connectionProvider, "connectionProvider must not be null");
        Assert.requireNonNull(duration, "connect timeout must not be null");
        Assert.requireNonNull(str, "host must not be null");
        logger.debug("connect()");
        PacketIdProvider atomic = PacketIdProvider.atomic();
        TdsEncoder tdsEncoder = new TdsEncoder(atomic);
        return TcpClient.create(connectionProvider).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(Math.toIntExact(duration.toMillis()))).host(str).port(i).connect().doOnNext(connection -> {
            ChannelPipeline pipeline = connection.channel().pipeline();
            pipeline.addFirst(tdsEncoder.getClass().getName(), tdsEncoder);
            TdsSslHandler tdsSslHandler = new TdsSslHandler(atomic);
            pipeline.addAfter(tdsEncoder.getClass().getName(), tdsSslHandler.getClass().getName(), tdsSslHandler);
            if (InternalLoggerFactory.getInstance(ReactorNettyClient.class).isTraceEnabled()) {
                pipeline.addFirst(LoggingHandler.class.getSimpleName(), new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
            }
        }).map(connection2 -> {
            return new ReactorNettyClient(connection2, tdsEncoder);
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public Mono<Void> close() {
        logger.debug("close()");
        return Mono.defer(() -> {
            logger.debug("close(subscribed)");
            Connection andSet = this.connection.getAndSet(null);
            return andSet == null ? Mono.empty() : Mono.create(monoSink -> {
                if (this.isClosed.compareAndSet(false, true)) {
                    andSet.channel().disconnect().addListener(channelFuture -> {
                        if (channelFuture.isSuccess()) {
                            monoSink.success();
                        } else {
                            monoSink.error(channelFuture.cause());
                        }
                    });
                }
            });
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public Flux<Message> exchange(Publisher<? extends ClientMessage> publisher) {
        Assert.requireNonNull(publisher, "Requests must not be null");
        logger.debug("exchange()");
        return Flux.defer(() -> {
            logger.debug("exchange(subscribed)");
            return this.isClosed.get() ? Flux.error(new IllegalStateException("Cannot exchange messages because the connection is closed")) : this.responseProcessor.doOnSubscribe(subscription -> {
                Flux from = Flux.from(publisher);
                FluxSink<ClientMessage> fluxSink = this.requests;
                fluxSink.getClass();
                Consumer consumer = (v1) -> {
                    r1.next(v1);
                };
                FluxSink<ClientMessage> fluxSink2 = this.requests;
                fluxSink2.getClass();
                from.subscribe(consumer, fluxSink2::error);
            });
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public TransactionDescriptor getTransactionDescriptor() {
        return this.transactionDescriptor.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public Optional<Collation> getDatabaseCollation() {
        return this.databaseCollation.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public boolean isColumnEncryptionSupported() {
        return this.encryptionSupported.get();
    }

    private static <T extends Message> BiConsumer<Message, SynchronousSink<Message>> handleMessage(Class<T> cls, BiConsumer<T, SynchronousSink<Message>> biConsumer) {
        return (message, synchronousSink) -> {
            if (cls.isInstance(message)) {
                biConsumer.accept(message, synchronousSink);
            } else {
                synchronousSink.next(message);
            }
        };
    }

    private static <T extends Message> Consumer<Message> handleExact(Class<T> cls, Consumer<T> consumer) {
        return message -> {
            if (cls == message.getClass()) {
                consumer.accept(message);
            }
        };
    }
}
