package reactor.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:reactor/rabbitmq/LazyChannelPool.class */
class LazyChannelPool implements ChannelPool {
    private static final int DEFAULT_CHANNEL_POOL_SIZE = 5;
    private final Mono<? extends Connection> connectionMono;
    private final BlockingQueue<Channel> channelsQueue;
    private final Scheduler subscriptionScheduler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LazyChannelPool(Mono<? extends Connection> mono, ChannelPoolOptions channelPoolOptions) {
        this.channelsQueue = new LinkedBlockingQueue(channelPoolOptions.getMaxCacheSize() == null ? DEFAULT_CHANNEL_POOL_SIZE : channelPoolOptions.getMaxCacheSize().intValue());
        this.connectionMono = mono;
        this.subscriptionScheduler = channelPoolOptions.getSubscriptionScheduler() == null ? Schedulers.newElastic("sender-channel-pool") : channelPoolOptions.getSubscriptionScheduler();
    }

    @Override // reactor.rabbitmq.ChannelPool
    public Mono<? extends Channel> getChannelMono() {
        return this.connectionMono.map(connection -> {
            Channel poll = this.channelsQueue.poll();
            if (poll == null) {
                poll = createChannel(connection);
            }
            return poll;
        }).subscribeOn(this.subscriptionScheduler);
    }

    @Override // reactor.rabbitmq.ChannelPool
    public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
        return (signalType, channel) -> {
            if (channel.isOpen()) {
                if (signalType == SignalType.ON_COMPLETE && this.channelsQueue.offer(channel)) {
                    return;
                }
                ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(signalType, channel);
            }
        };
    }

    @Override // reactor.rabbitmq.ChannelPool, java.lang.AutoCloseable
    public void close() {
        ArrayList arrayList = new ArrayList();
        this.channelsQueue.drainTo(arrayList);
        arrayList.forEach(channel -> {
            ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(SignalType.ON_COMPLETE, channel);
        });
    }

    private Channel createChannel(Connection connection) {
        try {
            return connection.createChannel();
        } catch (IOException e) {
            throw new RabbitFluxException("Error while creating channel", e);
        }
    }
}
