package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.support.RabbitUtils;

/* loaded from: input_file:org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.class */
public class BlockingQueueConsumer {
    private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class);
    private volatile ShutdownSignalException shutdown;
    private final String[] queues;
    private final int prefetchCount;
    private final boolean transactional;
    private final Channel channel;
    private final InternalConsumer consumer;
    private final AcknowledgeMode acknowledgeMode;
    private final BlockingQueue<Delivery> queue = new LinkedBlockingQueue();
    private final AtomicBoolean cancelled = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/amqp/rabbit/listener/BlockingQueueConsumer$Delivery.class */
    public static class Delivery {
        private final Envelope envelope;
        private final AMQP.BasicProperties properties;
        private final byte[] body;

        public Delivery(Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
            this.envelope = envelope;
            this.properties = basicProperties;
            this.body = bArr;
        }

        public Envelope getEnvelope() {
            return this.envelope;
        }

        public AMQP.BasicProperties getProperties() {
            return this.properties;
        }

        public byte[] getBody() {
            return this.body;
        }
    }

    /* loaded from: input_file:org/springframework/amqp/rabbit/listener/BlockingQueueConsumer$InternalConsumer.class */
    private class InternalConsumer extends DefaultConsumer {
        public InternalConsumer(Channel channel) {
            super(channel);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            if (BlockingQueueConsumer.logger.isDebugEnabled()) {
                BlockingQueueConsumer.logger.debug("Received shutdown signal for consumer tag=" + str, shutdownSignalException);
            }
            BlockingQueueConsumer.this.shutdown = shutdownSignalException;
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            if (BlockingQueueConsumer.this.cancelled.get() && BlockingQueueConsumer.this.acknowledgeMode.isTransactionAllowed()) {
                return;
            }
            BlockingQueueConsumer.logger.debug("Storing delivery for " + BlockingQueueConsumer.this);
            try {
                BlockingQueueConsumer.this.queue.put(new Delivery(envelope, basicProperties, bArr));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public BlockingQueueConsumer(Channel channel, AcknowledgeMode acknowledgeMode, boolean z, int i, String... strArr) {
        this.channel = channel;
        this.acknowledgeMode = acknowledgeMode;
        this.transactional = z;
        this.prefetchCount = i;
        this.queues = strArr;
        this.consumer = new InternalConsumer(channel);
    }

    public Channel getChannel() {
        return this.channel;
    }

    public String getConsumerTag() {
        return this.consumer.getConsumerTag();
    }

    private void checkShutdown() {
        if (this.shutdown != null) {
            throw Utility.fixStackTrace(this.shutdown);
        }
    }

    private Message handle(Delivery delivery) throws InterruptedException {
        if (delivery == null && this.shutdown != null) {
            throw this.shutdown;
        }
        if (delivery == null) {
            return null;
        }
        byte[] body = delivery.getBody();
        MessageProperties createMessageProperties = RabbitUtils.createMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8");
        createMessageProperties.setMessageCount(0);
        Message message = new Message(body, createMessageProperties);
        if (logger.isDebugEnabled()) {
            logger.debug("Received message: " + message);
        }
        return message;
    }

    public Message nextMessage() throws InterruptedException, ShutdownSignalException {
        logger.debug("Retrieving delivery for " + this);
        return handle(this.queue.take());
    }

    public Message nextMessage(long j) throws InterruptedException, ShutdownSignalException {
        if (logger.isDebugEnabled()) {
            logger.debug("Retrieving delivery for " + this);
        }
        checkShutdown();
        return handle(this.queue.poll(j, TimeUnit.MILLISECONDS));
    }

    public void start() throws AmqpException {
        try {
            this.channel.basicQos(this.prefetchCount);
            for (int i = 0; i < this.queues.length; i++) {
                this.channel.queueDeclarePassive(this.queues[i]);
                this.channel.basicConsume(this.queues[i], this.acknowledgeMode.isAutoAck(), this.consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Started " + this);
                }
            }
        } catch (IOException e) {
            throw RabbitUtils.convertRabbitAccessException(e);
        }
    }

    public void stop() {
        this.cancelled.set(true);
        logger.debug("Closing Rabbit Channel: " + this.channel);
        RabbitUtils.closeMessageConsumer(this.consumer.getChannel(), this.consumer.getConsumerTag(), this.transactional);
        RabbitUtils.closeChannel(this.channel);
    }

    public String toString() {
        return "Consumer: tag=[" + this.consumer.getConsumerTag() + "], channel=" + this.channel + ", acknowledgeMode=" + this.acknowledgeMode + " local queue size=" + this.queue.size();
    }
}
