package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.frame.FrameLengthFlyweight;
import io.rsocket.internal.BaseDuplexConnection;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

/* loaded from: input_file:io/rsocket/transport/netty/TcpDuplexConnection.class */
public final class TcpDuplexConnection extends BaseDuplexConnection {
    private final Connection connection;
    private final boolean encodeLength;

    public TcpDuplexConnection(Connection connection) {
        this(connection, true);
    }

    public TcpDuplexConnection(Connection connection, boolean z) {
        this.encodeLength = z;
        this.connection = (Connection) Objects.requireNonNull(connection, "connection must not be null");
        connection.channel().closeFuture().addListener(future -> {
            if (isDisposed()) {
                return;
            }
            dispose();
        });
    }

    public ByteBufAllocator alloc() {
        return this.connection.channel().alloc();
    }

    protected void doOnClose() {
        if (this.connection.isDisposed()) {
            return;
        }
        this.connection.dispose();
    }

    public Flux<ByteBuf> receive() {
        return this.connection.inbound().receive().map(this::decode);
    }

    public Mono<Void> send(Publisher<ByteBuf> publisher) {
        return publisher instanceof Mono ? this.connection.outbound().sendObject(((Mono) publisher).map(this::encode)).then() : this.connection.outbound().send(Flux.from(publisher).map(this::encode)).then();
    }

    private ByteBuf encode(ByteBuf byteBuf) {
        return this.encodeLength ? FrameLengthFlyweight.encode(alloc(), byteBuf.readableBytes(), byteBuf) : byteBuf;
    }

    private ByteBuf decode(ByteBuf byteBuf) {
        return this.encodeLength ? FrameLengthFlyweight.frame(byteBuf).retain() : byteBuf;
    }
}
