package io.r2dbc.mssql.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.mssql.client.ssl.TdsSslHandler;
import io.r2dbc.mssql.message.ClientMessage;
import io.r2dbc.mssql.message.Message;
import io.r2dbc.mssql.message.TransactionDescriptor;
import io.r2dbc.mssql.message.header.PacketIdProvider;
import io.r2dbc.mssql.message.tds.ProtocolException;
import io.r2dbc.mssql.message.token.AbstractDoneToken;
import io.r2dbc.mssql.message.token.AbstractInfoToken;
import io.r2dbc.mssql.message.token.EnvChangeToken;
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
import io.r2dbc.mssql.message.type.Collation;
import io.r2dbc.mssql.util.Assert;
import io.r2dbc.mssql.util.PredicateUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;

/* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicReference<ByteBufAllocator> byteBufAllocator = new AtomicReference<>();
    private final AtomicReference<Connection> connection = new AtomicReference<>();
    private final AtomicReference<TransactionDescriptor> transactionDescriptor = new AtomicReference<>(TransactionDescriptor.empty());
    private final AtomicReference<TransactionStatus> transactionStatus = new AtomicReference<>(TransactionStatus.AUTO_COMMIT);
    private final AtomicReference<Optional<Collation>> databaseCollation = new AtomicReference<>(Optional.empty());
    private final AtomicBoolean encryptionSupported = new AtomicBoolean();
    private final List<EnvironmentChangeListener> envChangeListeners = new ArrayList();
    private final Consumer<Message> handleInfoToken = handleMessage(AbstractInfoToken.class, abstractInfoToken -> {
        if (abstractInfoToken.getClassification() == AbstractInfoToken.Classification.INFORMATIONAL) {
            this.logger.debug("Info: Code [{}] Severity [{}]: {}", new Object[]{Long.valueOf(abstractInfoToken.getNumber()), abstractInfoToken.getClassification(), abstractInfoToken.getMessage()});
        } else {
            this.logger.debug("Warning: Code [{}] Severity [{}]: {}", new Object[]{Long.valueOf(abstractInfoToken.getNumber()), abstractInfoToken.getClassification(), abstractInfoToken.getMessage()});
        }
    });
    private final Consumer<Message> handleEnvChange = handleMessage(EnvChangeToken.class, envChangeToken -> {
        EnvironmentChangeEvent environmentChangeEvent = new EnvironmentChangeEvent(envChangeToken);
        for (EnvironmentChangeListener environmentChangeListener : this.envChangeListeners) {
            try {
                environmentChangeListener.onEnvironmentChange(environmentChangeEvent);
            } catch (Exception e) {
                this.logger.warn("Failed onEnvironmentChange() in {}", environmentChangeListener, e);
            }
        }
    });
    private final Consumer<Message> featureAckChange = handleMessage(FeatureExtAckToken.class, featureExtAckToken -> {
        Iterator<FeatureExtAckToken.FeatureToken> it = featureExtAckToken.getFeatureTokens().iterator();
        while (it.hasNext()) {
            if (it.next() instanceof FeatureExtAckToken.ColumnEncryption) {
                this.encryptionSupported.set(true);
            }
        }
    });
    private final BiConsumer<Message, SynchronousSink<Message>> handleStateChange = handleMessage(Message.class, (message, synchronousSink) -> {
        ConnectionState connectionState = this.state.get();
        if (connectionState.canAdvance(message)) {
            ConnectionState next = connectionState.next(message, this.connection.get());
            if (this.state.compareAndSet(connectionState, next)) {
                this.decodeFunction.set(next.decoder(this));
            } else {
                synchronousSink.error(new ProtocolException(String.format("Cannot advance state from [%s]", connectionState)));
            }
        }
        synchronousSink.next(message);
    });
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final EmitterProcessor<ClientMessage> requestProcessor = EmitterProcessor.create(false);
    private final FluxSink<ClientMessage> requests = this.requestProcessor.sink();
    private final EmitterProcessor<Flux<Message>> responseProcessor = EmitterProcessor.create(false);
    private final AtomicReference<ConnectionState> state = new AtomicReference<>(ConnectionState.PRELOGIN);
    private final AtomicReference<MessageDecoder> decodeFunction = new AtomicReference<>(ConnectionState.PRELOGIN.decoder(this));

    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$CollationListener.class */
    class CollationListener implements EnvironmentChangeListener {
        CollationListener() {
        }

        @Override // io.r2dbc.mssql.client.EnvironmentChangeListener
        public void onEnvironmentChange(EnvironmentChangeEvent environmentChangeEvent) {
            if (environmentChangeEvent.getToken().getChangeType() == EnvChangeToken.EnvChangeType.SQLCollation) {
                ReactorNettyClient.this.databaseCollation.set(Optional.of(Collation.decode(Unpooled.wrappedBuffer(environmentChangeEvent.getToken().getNewValue()))));
            }
        }
    }

    /* loaded from: input_file:io/r2dbc/mssql/client/ReactorNettyClient$TransactionListener.class */
    class TransactionListener implements EnvironmentChangeListener {
        TransactionListener() {
        }

        @Override // io.r2dbc.mssql.client.EnvironmentChangeListener
        public void onEnvironmentChange(EnvironmentChangeEvent environmentChangeEvent) {
            EnvChangeToken token = environmentChangeEvent.getToken();
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx || token.getChangeType() == EnvChangeToken.EnvChangeType.EnlistDTC) {
                byte[] newValue = token.getNewValue();
                if (newValue.length != 8) {
                    throw ProtocolException.invalidTds("Transaction descriptor length mismatch");
                }
                if (ReactorNettyClient.this.logger.isDebugEnabled()) {
                    ReactorNettyClient.this.logger.debug(String.format("Transaction %s", token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx ? "started" : "enlisted"));
                }
                ReactorNettyClient.this.transactionStatus.set(TransactionStatus.STARTED);
                ReactorNettyClient.this.transactionDescriptor.set(TransactionDescriptor.from(newValue));
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.CommitTx) {
                if (ReactorNettyClient.this.logger.isDebugEnabled()) {
                    ReactorNettyClient.this.logger.debug("Transaction committed");
                }
                ReactorNettyClient.this.transactionStatus.set(TransactionStatus.EXPLICIT);
                ReactorNettyClient.this.transactionDescriptor.set(TransactionDescriptor.empty());
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.RollbackTx) {
                if (ReactorNettyClient.this.logger.isDebugEnabled()) {
                    ReactorNettyClient.this.logger.debug("Transaction rolled back");
                }
                ReactorNettyClient.this.transactionStatus.set(TransactionStatus.EXPLICIT);
                ReactorNettyClient.this.transactionDescriptor.set(TransactionDescriptor.empty());
            }
        }
    }

    private ReactorNettyClient(Connection connection, List<EnvironmentChangeListener> list) {
        Assert.requireNonNull(connection, "Connection must not be null");
        FluxSink sink = this.responseProcessor.sink();
        StreamDecoder streamDecoder = new StreamDecoder();
        this.byteBufAllocator.set(connection.outbound().alloc());
        this.connection.set(connection);
        this.envChangeListeners.addAll(list);
        this.envChangeListeners.add(new TransactionListener());
        this.envChangeListeners.add(new CollationListener());
        Flux map = connection.inbound().receiveObject().concatMap(obj -> {
            if (!(obj instanceof ByteBuf)) {
                return obj instanceof Message ? Mono.just((Message) obj) : Mono.error(new ProtocolException(String.format("Unexpected protocol message: [%s]", obj)));
            }
            ByteBuf byteBuf = (ByteBuf) obj;
            byteBuf.retain();
            return streamDecoder.decode(byteBuf, this.decodeFunction.get());
        }).doOnNext(message -> {
            this.logger.debug("Response: {}", message);
        }).doOnError(th -> {
            this.logger.warn("Error: {}", th);
        }).handle(this.handleStateChange).doOnNext(this.handleEnvChange).doOnNext(this.featureAckChange).doOnNext(this.handleInfoToken).doOnError(ProtocolException.class, protocolException -> {
            this.isClosed.set(true);
            connection.channel().close();
        }).windowUntil(PredicateUtils.or(AbstractDoneToken::isDone)).map((v0) -> {
            return v0.cache();
        });
        sink.getClass();
        Consumer consumer = (v1) -> {
            r1.next(v1);
        };
        sink.getClass();
        Consumer consumer2 = sink::error;
        sink.getClass();
        map.subscribe(consumer, consumer2, sink::complete);
        this.requestProcessor.doOnError(th2 -> {
            this.logger.warn("Error: {}", th2);
            this.isClosed.set(true);
            connection.channel().close();
        }).doOnNext(clientMessage -> {
            this.logger.debug("Request:  {}", clientMessage);
        }).concatMap(clientMessage2 -> {
            return connection.outbound().sendObject(clientMessage2.encode(connection.outbound().alloc()));
        }).subscribe();
    }

    public static Mono<ReactorNettyClient> connect(String str, int i) {
        Assert.requireNonNull(str, "host must not be null");
        return connect(str, i, Duration.ofSeconds(30L));
    }

    public static Mono<ReactorNettyClient> connect(String str, int i, Duration duration) {
        Assert.requireNonNull(duration, "connect timeout must not be null");
        Assert.requireNonNull(str, "host must not be null");
        return connect(str, i, duration, ConnectionProvider.newConnection());
    }

    private static Mono<ReactorNettyClient> connect(String str, int i, Duration duration, ConnectionProvider connectionProvider) {
        Assert.requireNonNull(connectionProvider, "connectionProvider must not be null");
        Assert.requireNonNull(duration, "connect timeout must not be null");
        Assert.requireNonNull(str, "host must not be null");
        PacketIdProvider atomic = PacketIdProvider.atomic();
        TdsEncoder tdsEncoder = new TdsEncoder(atomic);
        return TcpClient.create(connectionProvider).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(Math.toIntExact(duration.toMillis()))).host(str).port(i).connect().doOnNext(connection -> {
            ChannelPipeline pipeline = connection.channel().pipeline();
            InternalLogger internalLoggerFactory = InternalLoggerFactory.getInstance(ReactorNettyClient.class);
            pipeline.addFirst(tdsEncoder.getClass().getName(), tdsEncoder);
            TdsSslHandler tdsSslHandler = new TdsSslHandler(atomic);
            pipeline.addAfter(tdsEncoder.getClass().getName(), tdsSslHandler.getClass().getName(), tdsSslHandler);
            if (internalLoggerFactory.isDebugEnabled()) {
                pipeline.addFirst(LoggingHandler.class.getSimpleName(), new LoggingHandler(ReactorNettyClient.class, LogLevel.DEBUG));
            }
        }).map(connection2 -> {
            return new ReactorNettyClient(connection2, Collections.singletonList(tdsEncoder));
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public Mono<Void> close() {
        return Mono.defer(() -> {
            Connection andSet = this.connection.getAndSet(null);
            return andSet == null ? Mono.empty() : Mono.create(monoSink -> {
                this.isClosed.set(true);
                andSet.channel().disconnect().addListener(channelFuture -> {
                    if (channelFuture.isSuccess()) {
                        monoSink.success();
                    } else {
                        monoSink.error(channelFuture.cause());
                    }
                });
            });
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public Flux<Message> exchange(Publisher<? extends ClientMessage> publisher) {
        Assert.requireNonNull(publisher, "Requests must not be null");
        return Flux.defer(() -> {
            return this.isClosed.get() ? Flux.error(new IllegalStateException("Cannot exchange messages because the connection is closed")) : this.responseProcessor.flatMap(Function.identity()).doOnSubscribe(subscription -> {
                Flux from = Flux.from(publisher);
                FluxSink<ClientMessage> fluxSink = this.requests;
                fluxSink.getClass();
                Consumer consumer = (v1) -> {
                    r1.next(v1);
                };
                FluxSink<ClientMessage> fluxSink2 = this.requests;
                fluxSink2.getClass();
                from.subscribe(consumer, fluxSink2::error);
            });
        });
    }

    @Override // io.r2dbc.mssql.client.Client
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public TransactionDescriptor getTransactionDescriptor() {
        return this.transactionDescriptor.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public Optional<Collation> getDatabaseCollation() {
        return this.databaseCollation.get();
    }

    @Override // io.r2dbc.mssql.client.Client
    public boolean isColumnEncryptionSupported() {
        return this.encryptionSupported.get();
    }

    private static <T extends Message> BiConsumer<Message, SynchronousSink<Message>> handleMessage(Class<T> cls, BiConsumer<T, SynchronousSink<Message>> biConsumer) {
        return (message, synchronousSink) -> {
            if (cls.isInstance(message)) {
                biConsumer.accept(message, synchronousSink);
            } else {
                synchronousSink.next(message);
            }
        };
    }

    private static <T extends Message> Consumer<Message> handleMessage(Class<T> cls, Consumer<T> consumer) {
        return message -> {
            if (cls.isInstance(message)) {
                consumer.accept(message);
            }
        };
    }
}
