package io.r2dbc.postgresql.client;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.r2dbc.postgresql.message.backend.BackendKeyData;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.BackendMessageDecoder;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.Field;
import io.r2dbc.postgresql.message.backend.NoticeResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.message.backend.ParameterStatus;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Terminate;
import io.r2dbc.postgresql.util.Assert;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.net.ssl.SSLException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:io/r2dbc/postgresql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private final ByteBufAllocator byteBufAllocator;
    private final Connection connection;
    private volatile Integer processId;
    private volatile Integer secretKey;
    private final EmitterProcessor<FrontendMessage> requestProcessor = EmitterProcessor.create(false);
    private final FluxSink<FrontendMessage> requests = this.requestProcessor.sink();
    private final Queue<MonoSink<Flux<BackendMessage>>> responseReceivers = (Queue) Queues.unbounded().get();
    private final DirectProcessor<NotificationResponse> notificationProcessor = DirectProcessor.create();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private volatile TransactionStatus transactionStatus = TransactionStatus.IDLE;
    private volatile Version version = new Version("", 0);

    /* loaded from: input_file:io/r2dbc/postgresql/client/ReactorNettyClient$EnsureSubscribersCompleteChannelHandler.class */
    private static final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
        private final EmitterProcessor<FrontendMessage> requestProcessor;
        private final Queue<MonoSink<Flux<BackendMessage>>> responseReceivers;

        private EnsureSubscribersCompleteChannelHandler(EmitterProcessor<FrontendMessage> emitterProcessor, Queue<MonoSink<Flux<BackendMessage>>> queue) {
            this.requestProcessor = emitterProcessor;
            this.responseReceivers = queue;
        }

        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelUnregistered(channelHandlerContext);
            this.requestProcessor.onComplete();
            MonoSink<Flux<BackendMessage>> poll = this.responseReceivers.poll();
            while (true) {
                MonoSink<Flux<BackendMessage>> monoSink = poll;
                if (monoSink == null) {
                    return;
                }
                monoSink.success(Flux.empty());
                poll = this.responseReceivers.poll();
            }
        }
    }

    private ReactorNettyClient(Connection connection) {
        Assert.requireNonNull(connection, "Connection must not be null");
        connection.addHandler(new LengthFieldBasedFrameDecoder(2147483642, 1, 4, -4, 0));
        connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestProcessor, this.responseReceivers));
        this.connection = connection;
        this.byteBufAllocator = connection.outbound().alloc();
        Mono then = connection.inbound().receive().map(BackendMessageDecoder::decode).handle(this::handleResponse).windowWhile(backendMessage -> {
            return backendMessage.getClass() != ReadyForQuery.class;
        }).doOnNext(flux -> {
            MonoSink<Flux<BackendMessage>> poll = this.responseReceivers.poll();
            if (poll != null) {
                poll.success(flux);
            }
        }).doOnComplete(() -> {
            MonoSink<Flux<BackendMessage>> poll = this.responseReceivers.poll();
            if (poll != null) {
                poll.success(Flux.empty());
            }
        }).then();
        Mono then2 = this.requestProcessor.flatMap(frontendMessage -> {
            if (DEBUG_ENABLED) {
                logger.debug("Request:  {}", frontendMessage);
            }
            return connection.outbound().send(frontendMessage.encode(this.byteBufAllocator));
        }, 1).then();
        then.onErrorResume(th -> {
            MonoSink<Flux<BackendMessage>> poll = this.responseReceivers.poll();
            if (poll != null) {
                poll.error(th);
            }
            this.requestProcessor.onComplete();
            if (isSslException(th)) {
                logger.debug("Connection Error", th);
            } else {
                logger.error("Connection Error", th);
            }
            return close();
        }).subscribe();
        then2.onErrorResume(th2 -> {
            if (isSslException(th2)) {
                logger.debug("Connection Error", th2);
            }
            logger.error("Connection Error", th2);
            return close();
        }).subscribe();
    }

    private static boolean isSslException(Throwable th) {
        return (th instanceof SSLException) || (th.getCause() instanceof SSLException);
    }

    private void handleResponse(BackendMessage backendMessage, SynchronousSink<BackendMessage> synchronousSink) {
        if (DEBUG_ENABLED) {
            logger.debug("Response: {}", backendMessage);
        }
        if (backendMessage.getClass() == NoticeResponse.class) {
            logger.warn("Notice: {}", toString(((NoticeResponse) backendMessage).getFields()));
            return;
        }
        if (backendMessage.getClass() == BackendKeyData.class) {
            BackendKeyData backendKeyData = (BackendKeyData) backendMessage;
            this.processId = Integer.valueOf(backendKeyData.getProcessId());
            this.secretKey = Integer.valueOf(backendKeyData.getSecretKey());
            return;
        }
        if (backendMessage.getClass() == ErrorResponse.class) {
            logger.warn("Error: {}", toString(((ErrorResponse) backendMessage).getFields()));
        }
        if (backendMessage.getClass() == ParameterStatus.class) {
            handleParameterStatus((ParameterStatus) backendMessage);
        }
        if (backendMessage.getClass() == ReadyForQuery.class) {
            this.transactionStatus = TransactionStatus.valueOf(((ReadyForQuery) backendMessage).getTransactionStatus());
        }
        if (backendMessage.getClass() == NotificationResponse.class) {
            this.notificationProcessor.onNext((NotificationResponse) backendMessage);
        } else {
            synchronousSink.next(backendMessage);
        }
    }

    private void handleParameterStatus(ParameterStatus parameterStatus) {
        Version version = this.version;
        String version2 = version.getVersion();
        int versionNumber = version.getVersionNumber();
        if (parameterStatus.getName().equals("server_version_num")) {
            versionNumber = Integer.parseInt(parameterStatus.getValue());
        }
        if (parameterStatus.getName().equals("server_version")) {
            version2 = parameterStatus.getValue();
            if (versionNumber == 0) {
                versionNumber = Version.parseServerVersionStr(version2);
            }
        }
        this.version = new Version(version2, versionNumber);
    }

    public static Mono<ReactorNettyClient> connect(String str, int i) {
        Assert.requireNonNull(str, "host must not be null");
        return connect(str, i, null, new SSLConfig(SSLMode.DISABLE, null, null));
    }

    public static Mono<ReactorNettyClient> connect(String str, int i, @Nullable Duration duration, SSLConfig sSLConfig) {
        return connect(ConnectionProvider.newConnection(), str, i, duration, sSLConfig);
    }

    public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProvider, String str, int i, @Nullable Duration duration, SSLConfig sSLConfig) {
        Assert.requireNonNull(connectionProvider, "connectionProvider must not be null");
        Assert.requireNonNull(str, "host must not be null");
        TcpClient port = TcpClient.create(connectionProvider).host(str).port(i);
        if (duration != null) {
            port = port.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(Math.toIntExact(duration.toMillis())));
        }
        return port.connect().flatMap(connection -> {
            return registerSslHandler(sSLConfig, connection).thenReturn(new ReactorNettyClient(connection));
        });
    }

    private static Mono<? extends Void> registerSslHandler(SSLConfig sSLConfig, Connection connection) {
        if (!sSLConfig.getSslMode().startSsl()) {
            return Mono.empty();
        }
        SSLSessionHandlerAdapter sSLSessionHandlerAdapter = new SSLSessionHandlerAdapter(connection.outbound().alloc(), sSLConfig);
        connection.addHandlerFirst(sSLSessionHandlerAdapter);
        return sSLSessionHandlerAdapter.getHandshake();
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Mono<Void> close() {
        return Mono.defer(() -> {
            if (!this.isClosed.compareAndSet(false, true)) {
                return Mono.empty();
            }
            if (this.connection.channel().isOpen() && this.processId != null) {
                return Flux.just(Terminate.INSTANCE).doOnNext(terminate -> {
                    logger.debug("Request:  {}", terminate);
                }).concatMap(terminate2 -> {
                    return this.connection.outbound().send(terminate2.encode(this.connection.outbound().alloc()));
                }).then().doOnSuccess(r3 -> {
                    this.connection.dispose();
                }).then(this.connection.onDispose());
            }
            this.connection.dispose();
            return this.connection.onDispose();
        });
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Flux<BackendMessage> exchange(Publisher<FrontendMessage> publisher) {
        Assert.requireNonNull(publisher, "requests must not be null");
        return Mono.create(monoSink -> {
            if (this.isClosed.get()) {
                monoSink.error(new IllegalStateException("Cannot exchange messages because the connection is closed"));
            }
            AtomicInteger atomicInteger = new AtomicInteger();
            Flux from = Flux.from(publisher);
            Consumer consumer = frontendMessage -> {
                if (atomicInteger.get() != 0 || !atomicInteger.compareAndSet(0, 1)) {
                    this.requests.next(frontendMessage);
                    return;
                }
                synchronized (this) {
                    this.responseReceivers.add(monoSink);
                    this.requests.next(frontendMessage);
                }
            };
            FluxSink<FrontendMessage> fluxSink = this.requests;
            fluxSink.getClass();
            from.subscribe(consumer, fluxSink::error);
        }).flatMapMany(Function.identity());
    }

    @Override // io.r2dbc.postgresql.client.Client
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator;
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Optional<Integer> getProcessId() {
        return Optional.ofNullable(this.processId);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Optional<Integer> getSecretKey() {
        return Optional.ofNullable(this.secretKey);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus;
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Version getVersion() {
        return this.version;
    }

    @Override // io.r2dbc.postgresql.client.Client
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        return this.connection.channel().isOpen();
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
        return this.notificationProcessor.subscribe(consumer);
    }

    private static String toString(List<Field> list) {
        StringJoiner stringJoiner = new StringJoiner(", ");
        for (Field field : list) {
            stringJoiner.add(field.getType().name() + "=" + field.getValue());
        }
        return stringJoiner.toString();
    }
}
