package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitMessageProperties;
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer;
import org.springframework.amqp.rabbit.support.RabbitUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.class */
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
    private volatile Set<BlockingQueueConsumer> consumers;
    private volatile int prefetchCount = 1;
    private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor();
    private volatile int concurrentConsumers = 1;
    private volatile int blockingQueueConsumerCapacity = -1;
    private volatile Set<Channel> channels = null;
    private final Object consumersMonitor = new Object();

    /* loaded from: input_file:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.class */
    private class AsyncMessageProcessingConsumer implements Runnable {
        private BlockingQueueConsumer q;
        private int txSize;
        private long timeLimit;
        private SimpleMessageListenerContainer messageListenerContainer;

        public AsyncMessageProcessingConsumer(BlockingQueueConsumer blockingQueueConsumer, int i, int i2, SimpleMessageListenerContainer simpleMessageListenerContainer) {
            this.q = blockingQueueConsumer;
            this.txSize = i;
            this.timeLimit = 1000 * i2;
            this.messageListenerContainer = simpleMessageListenerContainer;
        }

        @Override // java.lang.Runnable
        public void run() {
            BlockingQueueConsumer.Delivery nextDelivery;
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis;
            int i = 0;
            Channel channel = this.q.getChannel();
            while (true) {
                try {
                    if (this.timeLimit != 0 && j >= currentTimeMillis + this.timeLimit) {
                        break;
                    }
                    if (this.timeLimit != 0) {
                        nextDelivery = this.q.nextDelivery((currentTimeMillis + this.timeLimit) - j);
                        if (nextDelivery == null) {
                            break;
                        }
                    } else {
                        nextDelivery = this.q.nextDelivery();
                    }
                    byte[] body = nextDelivery.getBody();
                    Envelope envelope = nextDelivery.getEnvelope();
                    SimpleMessageListenerContainer.this.logger.debug("Received message from exchange [" + envelope.getExchange() + "], routing-key [" + envelope.getRoutingKey() + "]");
                    this.messageListenerContainer.processMessage(new Message(body, new RabbitMessageProperties(nextDelivery.getProperties(), envelope.getExchange(), envelope.getRoutingKey(), Boolean.valueOf(envelope.isRedeliver()), envelope.getDeliveryTag(), 0)), channel);
                    if (this.txSize != 0 && i % this.txSize == 0) {
                        channel.txCommit();
                    }
                    j = System.currentTimeMillis();
                    i++;
                } catch (InterruptedException e) {
                    SimpleMessageListenerContainer.this.logger.debug("Consumer thread interrupted, processing stopped.");
                    Thread.currentThread().interrupt();
                    return;
                } catch (ShutdownSignalException e2) {
                    SimpleMessageListenerContainer.this.logger.debug("Consumer received ShutdownSignal, processing stopped.");
                    return;
                } catch (IOException e3) {
                    throw new AmqpException(e3);
                }
            }
        }
    }

    public SimpleMessageListenerContainer() {
    }

    public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        setConnectionFactory(connectionFactory);
    }

    public void setConcurrentConsumers(int i) {
        Assert.isTrue(i > 0, "'concurrentConsumers' value must be at least 1 (one)");
        this.concurrentConsumers = i;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractRabbitListeningContainer, org.springframework.amqp.rabbit.support.RabbitAccessor
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        if (getConnectionFactory() instanceof CachingConnectionFactory) {
            CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) getConnectionFactory();
            if (cachingConnectionFactory.getChannelCacheSize() < this.concurrentConsumers) {
                throw new IllegalStateException("CachingConnectionFactory's ChannelCacheSize not be less than the number of SimpleMessageListener's ConcurrentConsumers");
            }
            if (this.concurrentConsumers != 1 || cachingConnectionFactory.getChannelCacheSize() <= 1) {
                return;
            }
            this.logger.info("Setting number of concurrent consumers to CachingConnectionFactory's ChannelCacheSize [" + cachingConnectionFactory.getChannelCacheSize() + "]");
            this.concurrentConsumers = cachingConnectionFactory.getChannelCacheSize();
        }
    }

    public void setTaskExecutor(Executor executor) {
        Assert.notNull(executor, "taskExecutor must not be null");
        this.taskExecutor = executor;
    }

    public int getPrefetchCount() {
        return this.prefetchCount;
    }

    public void setPrefetchCount(int i) {
        this.prefetchCount = i;
    }

    public int getBlockingQueueConsumerCapacity() {
        return this.blockingQueueConsumerCapacity;
    }

    public void setBlockingQueueConsumerCapacity(int i) {
        this.blockingQueueConsumerCapacity = i;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractRabbitListeningContainer
    protected final boolean sharedConnectionEnabled() {
        return true;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractRabbitListeningContainer
    protected void doInitialize() throws Exception {
        establishSharedConnection();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractRabbitListeningContainer
    public void doStart() throws Exception {
        super.doStart();
        initializeConsumers();
        Iterator<BlockingQueueConsumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            this.taskExecutor.execute(new AsyncMessageProcessingConsumer(it.next(), 0, 0, this));
        }
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractRabbitListeningContainer
    protected void doShutdown() {
        if (isRunning()) {
            this.logger.debug("Closing Rabbit Consumers");
            for (BlockingQueueConsumer blockingQueueConsumer : this.consumers) {
                RabbitUtils.closeMessageConsumer(blockingQueueConsumer.getChannel(), blockingQueueConsumer.getConsumerTag());
            }
            this.logger.debug("Closing Rabbit Channels");
            Iterator<Channel> it = this.channels.iterator();
            while (it.hasNext()) {
                RabbitUtils.closeChannel(it.next());
            }
        }
    }

    protected void initializeConsumers() throws IOException {
        synchronized (this.consumersMonitor) {
            if (this.consumers == null) {
                this.channels = new HashSet(this.concurrentConsumers);
                this.consumers = new HashSet(this.concurrentConsumers);
                Connection sharedConnection = getSharedConnection();
                for (int i = 0; i < this.concurrentConsumers; i++) {
                    Channel createChannel = createChannel(sharedConnection);
                    if (isChannelLocallyTransacted(createChannel)) {
                        createChannel.txSelect();
                    }
                    BlockingQueueConsumer createBlockingQueueConsumer = createBlockingQueueConsumer(createChannel);
                    this.channels.add(createChannel);
                    this.consumers.add(createBlockingQueueConsumer);
                }
            }
        }
    }

    protected BlockingQueueConsumer createBlockingQueueConsumer(Channel channel) throws IOException {
        BlockingQueueConsumer blockingQueueConsumer = this.blockingQueueConsumerCapacity <= 0 ? new BlockingQueueConsumer(channel) : new BlockingQueueConsumer(channel, new LinkedBlockingQueue(this.blockingQueueConsumerCapacity));
        channel.basicQos(this.prefetchCount);
        String[] commaDelimitedListToStringArray = StringUtils.commaDelimitedListToStringArray(getRequiredQueueName());
        for (int i = 0; i < commaDelimitedListToStringArray.length; i++) {
            channel.queueDeclarePassive(commaDelimitedListToStringArray[i]);
            blockingQueueConsumer.setConsumerTag(channel.basicConsume(commaDelimitedListToStringArray[i], this.autoAck, blockingQueueConsumer));
        }
        return blockingQueueConsumer;
    }

    protected void processMessage(Message message, Channel channel) {
        boolean isExposeListenerChannel = isExposeListenerChannel();
        if (isExposeListenerChannel) {
            TransactionSynchronizationManager.bindResource(getConnectionFactory(), new LocallyExposedRabbitResourceHolder(channel));
        }
        try {
            executeListener(channel, message);
            if (isExposeListenerChannel) {
                TransactionSynchronizationManager.unbindResource(getConnectionFactory());
            }
        } catch (Throwable th) {
            if (isExposeListenerChannel) {
                TransactionSynchronizationManager.unbindResource(getConnectionFactory());
            }
            throw th;
        }
    }
}
