package org.springframework.integration.zeromq.channel;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.mapping.BytesMessageMapper;
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
import org.springframework.integration.zeromq.ZeroMqProxy;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/springframework/integration/zeromq/channel/ZeroMqChannel.class */
public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel {
    public static final Duration DEFAULT_CONSUME_DELAY = Duration.ofSeconds(1);
    private final Map<MessageHandler, Disposable> subscribers;
    private final Scheduler publisherScheduler;
    private final Scheduler subscriberScheduler;
    private final ZContext context;
    private final boolean pubSub;
    private final Mono<ZMQ.Socket> sendSocket;
    private final Mono<ZMQ.Socket> subscribeSocket;
    private final Flux<? extends Message<?>> subscriberData;
    private Duration consumeDelay;
    private BytesMessageMapper messageMapper;
    private Consumer<ZMQ.Socket> sendSocketConfigurer;
    private Consumer<ZMQ.Socket> subscribeSocketConfigurer;

    @Nullable
    private ZeroMqProxy zeroMqProxy;

    @Nullable
    private volatile String connectSendUrl;

    @Nullable
    private volatile String connectSubscribeUrl;

    @Nullable
    private volatile Disposable subscriberDataDisposable;
    private volatile boolean initialized;

    public ZeroMqChannel(ZContext zContext) {
        this(zContext, false);
    }

    public ZeroMqChannel(ZContext zContext, boolean z) {
        this.subscribers = new HashMap();
        this.publisherScheduler = Schedulers.newSingle("publisherScheduler");
        this.subscriberScheduler = Schedulers.newSingle("subscriberScheduler");
        this.consumeDelay = DEFAULT_CONSUME_DELAY;
        this.messageMapper = new EmbeddedJsonHeadersMessageMapper();
        this.sendSocketConfigurer = socket -> {
        };
        this.subscribeSocketConfigurer = socket2 -> {
        };
        Assert.notNull(zContext, "'context' must not be null");
        this.context = zContext;
        this.pubSub = z;
        Supplier<String> supplier = () -> {
            return "inproc://" + getComponentName() + ".pair";
        };
        Mono<Integer> prepareProxyMono = prepareProxyMono();
        this.sendSocket = prepareSendSocketMono(supplier, prepareProxyMono);
        this.subscribeSocket = prepareSubscribeSocketMono(supplier, prepareProxyMono);
        this.subscriberData = prepareSubscriberDataFlux();
    }

    private Mono<Integer> prepareProxyMono() {
        return Mono.defer(() -> {
            return this.zeroMqProxy != null ? Mono.fromCallable(() -> {
                return Integer.valueOf(this.zeroMqProxy.getBackendPort());
            }).filter(num -> {
                return num.intValue() > 0;
            }).repeatWhenEmpty(100, flux -> {
                return flux.delayElements(Duration.ofMillis(100L));
            }).doOnNext(num2 -> {
                setConnectUrl("tcp://localhost:" + this.zeroMqProxy.getFrontendPort() + ':' + this.zeroMqProxy.getBackendPort());
            }).doOnError(th -> {
                this.logger.error(th, () -> {
                    return "The provided '" + this.zeroMqProxy + "' has not been started";
                });
            }) : Mono.empty();
        }).cache();
    }

    private Mono<ZMQ.Socket> prepareSendSocketMono(Supplier<String> supplier, Mono<?> mono) {
        return mono.publishOn(this.publisherScheduler).then(Mono.fromCallable(() -> {
            return this.context.createSocket(this.connectSendUrl == null ? SocketType.PAIR : this.pubSub ? SocketType.PUB : SocketType.PUSH);
        })).doOnNext(this.sendSocketConfigurer).doOnNext(socket -> {
            socket.connect(this.connectSendUrl != null ? this.connectSendUrl : (String) supplier.get());
        }).cache().publishOn(this.publisherScheduler);
    }

    private Mono<ZMQ.Socket> prepareSubscribeSocketMono(Supplier<String> supplier, Mono<?> mono) {
        return mono.publishOn(this.subscriberScheduler).then(Mono.fromCallable(() -> {
            return this.context.createSocket(this.connectSubscribeUrl == null ? SocketType.PAIR : this.pubSub ? SocketType.SUB : SocketType.PULL);
        })).doOnNext(this.subscribeSocketConfigurer).doOnNext(socket -> {
            if (this.connectSubscribeUrl == null) {
                socket.bind((String) supplier.get());
                return;
            }
            if (this.pubSub) {
                socket.subscribe(ZMQ.SUBSCRIPTION_ALL);
            }
            socket.connect(this.connectSubscribeUrl);
        }).cache().publishOn(this.subscriberScheduler);
    }

    private Flux<? extends Message<?>> prepareSubscriberDataFlux() {
        Mono publishOn = this.subscribeSocket.flatMap(socket -> {
            byte[] recv;
            return (!this.initialized || (recv = socket.recv(1)) == null) ? Mono.empty() : Mono.just(recv);
        }).publishOn(Schedulers.parallel());
        BytesMessageMapper bytesMessageMapper = this.messageMapper;
        Objects.requireNonNull(bytesMessageMapper);
        Flux<? extends Message<?>> repeat = publishOn.map(bytesMessageMapper::toMessage).doOnError(th -> {
            this.logger.error(th, () -> {
                return "Error processing ZeroMQ message in the " + this;
            });
        }).repeatWhenEmpty(flux -> {
            return this.initialized ? flux.delayElements(this.consumeDelay) : flux;
        }).repeat(() -> {
            return this.initialized;
        });
        if (this.pubSub) {
            repeat = repeat.publish().autoConnect(1, disposable -> {
                this.subscriberDataDisposable = disposable;
            });
        }
        return repeat;
    }

    public void setConnectUrl(@Nullable String str) {
        if (str != null) {
            this.connectSendUrl = str.substring(0, str.lastIndexOf(58));
            this.connectSubscribeUrl = this.connectSendUrl.substring(0, this.connectSendUrl.lastIndexOf(58)) + str.substring(str.lastIndexOf(58));
        }
    }

    public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy) {
        this.zeroMqProxy = zeroMqProxy;
    }

    public void setConsumeDelay(Duration duration) {
        Assert.notNull(duration, "'consumeDelay' must not be null");
        this.consumeDelay = duration;
    }

    public void setMessageMapper(BytesMessageMapper bytesMessageMapper) {
        Assert.notNull(bytesMessageMapper, "'messageMapper' must not be null");
        this.messageMapper = bytesMessageMapper;
    }

    public void setSendSocketConfigurer(Consumer<ZMQ.Socket> consumer) {
        Assert.notNull(consumer, "'sendSocketConfigurer' must not be null");
        this.sendSocketConfigurer = consumer;
    }

    public void setSubscribeSocketConfigurer(Consumer<ZMQ.Socket> consumer) {
        Assert.notNull(consumer, "'subscribeSocketConfigurer' must not be null");
        this.subscribeSocketConfigurer = consumer;
    }

    protected void onInit() {
        Assert.state(this.zeroMqProxy == null || this.connectSendUrl == null, "A 'zeroMqProxy' or 'connectUrl' can be provided (or none), but not both.");
        super.onInit();
        this.sendSocket.subscribe();
        this.initialized = true;
    }

    protected boolean doSend(Message<?> message, long j) {
        Assert.state(this.initialized, "the channel is not initialized yet or already destroyed");
        byte[] bArr = (byte[]) this.messageMapper.fromMessage(message);
        Assert.state(bArr != null, () -> {
            return "The '" + this.messageMapper + "' returned null for '" + message + '\'';
        });
        Mono map = this.sendSocket.map(socket -> {
            return Boolean.valueOf(socket.send(bArr));
        });
        return Boolean.TRUE.equals(j > 0 ? (Boolean) map.block(Duration.ofMillis(j)) : (Boolean) map.block());
    }

    public boolean subscribe(MessageHandler messageHandler) {
        Assert.state(this.initialized, "the channel is not initialized yet or already destroyed");
        this.subscribers.computeIfAbsent(messageHandler, messageHandler2 -> {
            Flux<? extends Message<?>> flux = this.subscriberData;
            Objects.requireNonNull(messageHandler);
            return flux.subscribe(messageHandler::handleMessage);
        });
        return true;
    }

    public boolean unsubscribe(MessageHandler messageHandler) {
        Disposable remove = this.subscribers.remove(messageHandler);
        if (remove == null) {
            return false;
        }
        remove.dispose();
        return true;
    }

    public void destroy() {
        this.initialized = false;
        super.destroy();
        this.sendSocket.doOnNext((v0) -> {
            v0.close();
        }).block();
        this.publisherScheduler.dispose();
        new HashSet(this.subscribers.keySet()).forEach(this::unsubscribe);
        this.subscribeSocket.doOnNext((v0) -> {
            v0.close();
        }).block();
        this.subscriberScheduler.dispose();
        if (this.subscriberDataDisposable != null) {
            this.subscriberDataDisposable.dispose();
        }
    }
}
