package reactor.net.zmq;

import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.AbstractNetChannel;
import reactor.net.NetChannel;
import reactor.rx.Promise;

/* loaded from: input_file:reactor/net/zmq/ZeroMQNetChannel.class */
public class ZeroMQNetChannel<IN, OUT> extends AbstractNetChannel<IN, OUT> {
    private static final AtomicReferenceFieldUpdater<ZeroMQNetChannel, ZMsg> MSG_UPD = AtomicReferenceFieldUpdater.newUpdater(ZeroMQNetChannel.class, ZMsg.class, "currentMsg");
    private final Logger log;
    private final ZeroMQNetChannel<IN, OUT>.ZeroMQConsumerSpec eventSpec;
    private final MutableList<Runnable> closeHandlers;
    private volatile String connectionId;
    private volatile ZMQ.Socket socket;
    private volatile ZMsg currentMsg;

    /* loaded from: input_file:reactor/net/zmq/ZeroMQNetChannel$ZeroMQConsumerSpec.class */
    private class ZeroMQConsumerSpec implements NetChannel.ConsumerSpec {
        private ZeroMQConsumerSpec() {
        }

        @Override // reactor.net.NetChannel.ConsumerSpec
        public NetChannel.ConsumerSpec close(Runnable runnable) {
            ZeroMQNetChannel.this.closeHandlers.add(runnable);
            return this;
        }

        @Override // reactor.net.NetChannel.ConsumerSpec
        public NetChannel.ConsumerSpec readIdle(long j, Runnable runnable) {
            return this;
        }

        @Override // reactor.net.NetChannel.ConsumerSpec
        public NetChannel.ConsumerSpec writeIdle(long j, Runnable runnable) {
            return this;
        }
    }

    public ZeroMQNetChannel(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nonnull Dispatcher dispatcher, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(environment, codec, dispatcher, reactor2);
        this.log = LoggerFactory.getLogger(getClass());
        this.eventSpec = new ZeroMQConsumerSpec();
        this.closeHandlers = SynchronizedMutableList.of(FastList.newList());
    }

    public ZeroMQNetChannel<IN, OUT> setConnectionId(String str) {
        this.connectionId = str;
        return this;
    }

    public ZeroMQNetChannel<IN, OUT> setSocket(ZMQ.Socket socket) {
        this.socket = socket;
        return this;
    }

    @Override // reactor.net.NetChannel
    public InetSocketAddress remoteAddress() {
        return null;
    }

    @Override // reactor.net.AbstractNetChannel
    protected void write(ByteBuffer byteBuffer, Promise<Void> promise, boolean z) {
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        boolean compareAndSet = MSG_UPD.compareAndSet(this, null, new ZMsg());
        ZMsg zMsg = MSG_UPD.get(this);
        if (compareAndSet) {
            switch (this.socket.getType()) {
                case 6:
                    zMsg.add(new ZFrame(this.connectionId));
                    break;
            }
        }
        zMsg.add(new ZFrame(bArr));
        if (z) {
            doFlush(promise);
        }
    }

    @Override // reactor.net.AbstractNetChannel
    protected void write(Object obj, Promise<Void> promise, boolean z) {
        write(((Buffer) getEncoder().apply(obj)).byteBuffer(), promise, z);
    }

    @Override // reactor.net.AbstractNetChannel
    protected synchronized void flush() {
        doFlush(null);
    }

    private void doFlush(Promise<Void> promise) {
        ZMsg zMsg = MSG_UPD.get(this);
        MSG_UPD.compareAndSet(this, zMsg, null);
        if (null != zMsg) {
            boolean send = zMsg.send(this.socket);
            if (null != promise) {
                if (send) {
                    promise.onNext((Void) null);
                } else {
                    promise.onError(new RuntimeException("ZeroMQ Message could not be sent"));
                }
            }
        }
    }

    @Override // reactor.net.NetChannel
    public void close(final Consumer<Boolean> consumer) {
        getEventsReactor().schedule(new Consumer<Void>() { // from class: reactor.net.zmq.ZeroMQNetChannel.1
            public void accept(Void r6) {
                ZeroMQNetChannel.this.closeHandlers.removeIf(new CheckedPredicate<Runnable>() { // from class: reactor.net.zmq.ZeroMQNetChannel.1.1
                    public boolean safeAccept(Runnable runnable) throws Exception {
                        runnable.run();
                        return true;
                    }
                });
                if (null != consumer) {
                    consumer.accept(true);
                }
            }
        }, (Object) null);
    }

    @Override // reactor.net.NetChannel
    public NetChannel.ConsumerSpec on() {
        return this.eventSpec;
    }

    public String toString() {
        return "ZeroMQNetChannel{closeHandlers=" + this.closeHandlers + ", connectionId='" + this.connectionId + "', socket=" + this.socket + '}';
    }
}
