package org.springframework.amqp.rabbit.listener;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.tomcat.websocket.Constants;
import org.springframework.amqp.AmqpApplicationContextClosedException;
import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.ApplicationEvent;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.BackOffExecution;

/* loaded from: input_file:BOOT-INF/lib/spring-rabbit-2.4.9.jar:org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.class */
public class DirectMessageListenerContainer extends AbstractMessageListenerContainer {
    private static final int START_WAIT_TIME = 60;
    private static final int DEFAULT_MONITOR_INTERVAL = 10000;
    private static final int DEFAULT_ACK_TIMEOUT = 20000;
    private TaskScheduler taskScheduler;
    private boolean taskSchedulerSet;
    private int messagesPerAck;
    private volatile boolean started;
    private volatile boolean aborted;
    private volatile boolean hasStopped;
    private volatile ScheduledFuture<?> consumerMonitorTask;
    private volatile long lastAlertAt;
    private volatile long lastRestartAttempt;
    protected final List<SimpleConsumer> consumers = new LinkedList();
    private final Set<SimpleConsumer> consumersToRestart = new LinkedHashSet();
    private final Set<String> removedQueues = ConcurrentHashMap.newKeySet();
    private final MultiValueMap<String, SimpleConsumer> consumersByQueue = new LinkedMultiValueMap();
    private final ActiveObjectCounter<SimpleConsumer> cancellationLock = new ActiveObjectCounter<>();
    private long monitorInterval = AbstractComponentTracker.LINGERING_TIMEOUT;
    private long ackTimeout = Constants.DEFAULT_BLOCKING_SEND_TIMEOUT;
    private volatile CountDownLatch startedLatch = new CountDownLatch(1);
    private volatile int consumersPerQueue = 1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-rabbit-2.4.9.jar:org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer$SimpleConsumer.class */
    public final class SimpleConsumer extends DefaultConsumer {
        private final Log logger;
        private final Connection connection;
        private final String queue;
        private final int index;
        private final boolean ackRequired;
        private final ConnectionFactory connectionFactory;
        private final PlatformTransactionManager transactionManager;
        private final TransactionAttribute transactionAttribute;
        private final boolean isRabbitTxManager;
        private final int messagesPerAck;
        private final long ackTimeout;
        private final Channel targetChannel;
        private int pendingAcks;
        private long lastAck;
        private long latestDeferredDeliveryTag;
        private volatile String consumerTag;
        private volatile int epoch;
        private volatile TransactionTemplate transactionTemplate;
        private volatile boolean canceled;
        private volatile boolean ackFailed;

        SimpleConsumer(@Nullable Connection connection, @Nullable Channel channel, String str, int i) {
            super(channel);
            this.logger = DirectMessageListenerContainer.this.logger;
            this.connectionFactory = DirectMessageListenerContainer.this.getConnectionFactory();
            this.transactionManager = DirectMessageListenerContainer.this.getTransactionManager();
            this.transactionAttribute = DirectMessageListenerContainer.this.getTransactionAttribute();
            this.isRabbitTxManager = this.transactionManager instanceof RabbitTransactionManager;
            this.messagesPerAck = DirectMessageListenerContainer.this.messagesPerAck;
            this.ackTimeout = DirectMessageListenerContainer.this.ackTimeout;
            this.lastAck = System.currentTimeMillis();
            this.connection = connection;
            this.queue = str;
            this.index = i;
            this.ackRequired = (DirectMessageListenerContainer.this.getAcknowledgeMode().isAutoAck() || DirectMessageListenerContainer.this.getAcknowledgeMode().isManual()) ? false : true;
            if (channel instanceof ChannelProxy) {
                this.targetChannel = ((ChannelProxy) channel).getTargetChannel();
            } else {
                this.targetChannel = null;
            }
        }

        String getQueue() {
            return this.queue;
        }

        int getIndex() {
            return this.index;
        }

        @Override // com.rabbitmq.client.DefaultConsumer
        public String getConsumerTag() {
            return this.consumerTag;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getEpoch() {
            return this.epoch;
        }

        void setCanceled(boolean z) {
            this.canceled = z;
        }

        boolean isAckFailed() {
            return this.ackFailed;
        }

        boolean targetChanged() {
            return (this.targetChannel == null || ((ChannelProxy) getChannel()).getTargetChannel().equals(this.targetChannel)) ? false : true;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int incrementAndGetEpoch() {
            int i = this.epoch + 1;
            this.epoch = i;
            return i;
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
            MessageProperties messageProperties = DirectMessageListenerContainer.this.getMessagePropertiesConverter().toMessageProperties(basicProperties, envelope, "UTF-8");
            messageProperties.setConsumerTag(str);
            messageProperties.setConsumerQueue(this.queue);
            Message message = new Message(bArr, messageProperties);
            long deliveryTag = envelope.getDeliveryTag();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(this + " received " + message);
            }
            DirectMessageListenerContainer.this.updateLastReceive();
            Object obj = message;
            List<Message> debatch = DirectMessageListenerContainer.this.debatch(message);
            if (debatch != null) {
                obj = debatch;
            }
            try {
                if (this.transactionManager == null) {
                    try {
                        callExecuteListener(obj, deliveryTag);
                        return;
                    } catch (Exception e) {
                        return;
                    }
                }
                try {
                    executeListenerInTransaction(obj, deliveryTag);
                    if (this.isRabbitTxManager) {
                        ConsumerChannelRegistry.unRegisterConsumerChannel();
                    }
                } catch (AbstractMessageListenerContainer.WrappedTransactionException e2) {
                    if (e2.getCause() instanceof Error) {
                        throw ((Error) e2.getCause());
                    }
                    if (this.isRabbitTxManager) {
                        ConsumerChannelRegistry.unRegisterConsumerChannel();
                    }
                } catch (Exception e3) {
                    if (this.isRabbitTxManager) {
                        ConsumerChannelRegistry.unRegisterConsumerChannel();
                    }
                }
            } catch (Throwable th) {
                if (this.isRabbitTxManager) {
                    ConsumerChannelRegistry.unRegisterConsumerChannel();
                }
                throw th;
            }
        }

        private void executeListenerInTransaction(Object obj, long j) {
            if (this.isRabbitTxManager) {
                ConsumerChannelRegistry.registerConsumerChannel(getChannel(), this.connectionFactory);
            }
            if (this.transactionTemplate == null) {
                this.transactionTemplate = new TransactionTemplate(this.transactionManager, this.transactionAttribute);
            }
            this.transactionTemplate.execute(transactionStatus -> {
                RabbitResourceHolder bindResourceToTransaction = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(getChannel(), false), this.connectionFactory, true);
                if (bindResourceToTransaction != null) {
                    bindResourceToTransaction.addDeliveryTag(getChannel(), j);
                }
                try {
                    callExecuteListener(obj, j);
                    return null;
                } catch (RuntimeException e) {
                    DirectMessageListenerContainer.this.prepareHolderForRollback(bindResourceToTransaction, e);
                    throw e;
                } catch (Throwable th) {
                    throw new AbstractMessageListenerContainer.WrappedTransactionException(th);
                }
            });
        }

        private void callExecuteListener(Object obj, long j) {
            boolean isChannelLocallyTransacted = DirectMessageListenerContainer.this.isChannelLocallyTransacted();
            try {
                DirectMessageListenerContainer.this.executeListener(getChannel(), obj);
                handleAck(j, isChannelLocallyTransacted);
            } catch (Error e) {
                this.logger.error("Failed to invoke listener", e);
                DirectMessageListenerContainer.this.getJavaLangErrorHandler().handle(e);
                throw e;
            } catch (ImmediateAcknowledgeAmqpException e2) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery '" + e2.getMessage() + "': " + j);
                }
                handleAck(j, isChannelLocallyTransacted);
            } catch (Exception e3) {
                if (DirectMessageListenerContainer.this.causeChainHasImmediateAcknowledgeAmqpException(e3)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("User requested ack for failed delivery: " + j);
                    }
                    handleAck(j, isChannelLocallyTransacted);
                    return;
                }
                this.logger.error("Failed to invoke listener", e3);
                if (this.transactionManager == null) {
                    rollback(j, e3);
                    return;
                }
                if (this.transactionAttribute.rollbackOn(e3)) {
                    if (((RabbitResourceHolder) TransactionSynchronizationManager.getResource(DirectMessageListenerContainer.this.getConnectionFactory())) == null) {
                        rollback(j, e3);
                    }
                    throw e3;
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("No rollback for " + e3);
                }
            }
        }

        private void handleAck(long j, boolean z) {
            boolean z2 = z || (DirectMessageListenerContainer.this.isChannelTransacted() && TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
            try {
                if (this.ackRequired) {
                    if (this.messagesPerAck > 1) {
                        synchronized (this) {
                            this.latestDeferredDeliveryTag = j;
                            this.pendingAcks++;
                            ackIfNecessary(this.lastAck);
                        }
                    } else if (!DirectMessageListenerContainer.this.isChannelTransacted() || z2) {
                        sendAckWithNotify(j, false);
                    }
                }
                if (z2) {
                    RabbitUtils.commitIfNecessary(getChannel());
                }
            } catch (Exception e) {
                this.ackFailed = true;
                this.logger.error("Error acking", e);
            }
        }

        synchronized void ackIfNecessary(long j) throws Exception {
            if (this.pendingAcks < this.messagesPerAck) {
                if (this.pendingAcks <= 0) {
                    return;
                }
                if (j - this.lastAck <= this.ackTimeout && !this.canceled) {
                    return;
                }
            }
            sendAck(j);
        }

        private void rollback(long j, Exception exc) {
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.rollbackIfNecessary(getChannel());
            }
            if (this.ackRequired || ContainerUtils.isRejectManual(exc)) {
                try {
                    if (this.messagesPerAck > 1) {
                        synchronized (this) {
                            if (this.pendingAcks > 0) {
                                sendAck(System.currentTimeMillis());
                            }
                        }
                    }
                    getChannel().basicNack(j, !DirectMessageListenerContainer.this.isAsyncReplies(), ContainerUtils.shouldRequeue(DirectMessageListenerContainer.this.isDefaultRequeueRejected(), exc, this.logger));
                } catch (Exception e) {
                    this.logger.error("Failed to nack message", e);
                }
            }
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.commitIfNecessary(getChannel());
            }
        }

        protected synchronized void sendAck(long j) throws Exception {
            sendAckWithNotify(this.latestDeferredDeliveryTag, true);
            this.lastAck = j;
            this.pendingAcks = 0;
        }

        private void sendAckWithNotify(long j, boolean z) throws Exception {
            try {
                getChannel().basicAck(j, z);
                notifyMessageAckListener(true, j, null);
            } catch (Exception e) {
                notifyMessageAckListener(false, j, e);
                throw e;
            }
        }

        private void notifyMessageAckListener(boolean z, long j, @Nullable Throwable th) {
            try {
                DirectMessageListenerContainer.this.getMessageAckListener().onComplete(z, j, th);
            } catch (Exception e) {
                this.logger.error("An exception occured on MessageAckListener.", e);
            }
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleConsumeOk(String str) {
            super.handleConsumeOk(str);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("New " + this + " consumeOk");
            }
            if (DirectMessageListenerContainer.this.getApplicationEventPublisher() != null) {
                DirectMessageListenerContainer.this.getApplicationEventPublisher().publishEvent((ApplicationEvent) new ConsumeOkEvent(this, getQueue(), str));
            }
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleCancelOk(String str) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("CancelOk " + this);
            }
            finalizeConsumer();
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleCancel(String str) {
            this.logger.error("Consumer canceled - queue deleted? " + this);
            cancelConsumer("Consumer " + this + " canceled");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void cancelConsumer(String str) {
            DirectMessageListenerContainer.this.publishConsumerFailedEvent(str, true, null);
            synchronized (DirectMessageListenerContainer.this.consumersMonitor) {
                List list = DirectMessageListenerContainer.this.consumersByQueue.get(this.queue);
                if (list != null) {
                    list.remove(this);
                }
                DirectMessageListenerContainer.this.consumers.remove(this);
                DirectMessageListenerContainer.this.addConsumerToRestart(this);
            }
            finalizeConsumer();
        }

        private void finalizeConsumer() {
            RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
            RabbitUtils.closeChannel(getChannel());
            RabbitUtils.closeConnection(this.connection);
            DirectMessageListenerContainer.this.cancellationLock.release(this);
            DirectMessageListenerContainer.this.consumerRemoved(this);
        }

        public int hashCode() {
            return (31 * ((31 * ((31 * 1) + getEnclosingInstance().hashCode())) + this.index)) + (this.queue == null ? 0 : this.queue.hashCode());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SimpleConsumer simpleConsumer = (SimpleConsumer) obj;
            if (getEnclosingInstance().equals(simpleConsumer.getEnclosingInstance()) && this.index == simpleConsumer.index) {
                return this.queue == null ? simpleConsumer.queue == null : this.queue.equals(simpleConsumer.queue);
            }
            return false;
        }

        private DirectMessageListenerContainer getEnclosingInstance() {
            return DirectMessageListenerContainer.this;
        }

        public String toString() {
            return "SimpleConsumer [queue=" + this.queue + ", index=" + this.index + ", consumerTag=" + this.consumerTag + " identity=" + ObjectUtils.getIdentityHexString(this) + "]";
        }
    }

    public DirectMessageListenerContainer() {
        setMissingQueuesFatal(false);
        doSetPossibleAuthenticationFailureFatal(false);
    }

    public DirectMessageListenerContainer(ConnectionFactory connectionFactory) {
        setConnectionFactory(connectionFactory);
        setMissingQueuesFatal(false);
        doSetPossibleAuthenticationFailureFatal(false);
    }

    public void setConsumersPerQueue(int i) {
        if (isRunning()) {
            adjustConsumers(i);
        }
        this.consumersPerQueue = i;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public final void setExclusive(boolean z) {
        Assert.isTrue(!z || this.consumersPerQueue == 1, "When the consumer is exclusive, the consumers per queue must be 1");
        super.setExclusive(z);
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
        this.taskSchedulerSet = true;
    }

    public void setMonitorInterval(long j) {
        this.monitorInterval = j;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer, org.springframework.amqp.rabbit.listener.MessageListenerContainer
    public void setQueueNames(String... strArr) {
        Assert.state(!isRunning(), "Cannot set queue names while running, use add/remove");
        super.setQueueNames(strArr);
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public final void setMissingQueuesFatal(boolean z) {
        super.setMissingQueuesFatal(z);
    }

    public void setMessagesPerAck(int i) {
        this.messagesPerAck = i;
    }

    public void setAckTimeout(long j) {
        this.ackTimeout = j;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void addQueueNames(String... strArr) {
        Assert.notNull(strArr, "'queueNames' cannot be null");
        Assert.noNullElements(strArr, "'queueNames' cannot contain null elements");
        try {
            Stream stream = Arrays.stream(strArr);
            Set<String> set = this.removedQueues;
            Objects.requireNonNull(set);
            stream.forEach((v1) -> {
                r1.remove(v1);
            });
            addQueues(Arrays.stream(strArr));
            super.addQueueNames(strArr);
        } catch (AmqpIOException e) {
            throw new AmqpIOException("Failed to add " + Arrays.toString(strArr), e);
        }
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void addQueues(Queue... queueArr) {
        Assert.notNull(queueArr, "'queues' cannot be null");
        Assert.noNullElements(queueArr, "'queues' cannot contain null elements");
        try {
            Stream map = Arrays.stream(queueArr).map(queue -> {
                return queue.getActualName();
            });
            Set<String> set = this.removedQueues;
            Objects.requireNonNull(set);
            map.forEach((v1) -> {
                r1.remove(v1);
            });
            addQueues(Arrays.stream(queueArr).map((v0) -> {
                return v0.getName();
            }));
            super.addQueues(queueArr);
        } catch (AmqpIOException e) {
            throw new AmqpIOException("Failed to add " + Arrays.toString(queueArr), e);
        }
    }

    private void addQueues(Stream<String> stream) {
        if (isRunning()) {
            synchronized (this.consumersMonitor) {
                checkStartState();
                Set<String> queueNamesAsSet = getQueueNamesAsSet();
                stream.forEach(str -> {
                    if (queueNamesAsSet.contains(str)) {
                        this.logger.warn("Queue " + str + " is already configured for this container: " + this + ", ignoring add");
                    } else {
                        consumeFromQueue(str);
                    }
                });
            }
        }
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public boolean removeQueueNames(String... strArr) {
        removeQueues(Arrays.stream(strArr));
        return super.removeQueueNames(strArr);
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public boolean removeQueues(Queue... queueArr) {
        removeQueues(Arrays.stream(queueArr).map((v0) -> {
            return v0.getActualName();
        }));
        return super.removeQueues(queueArr);
    }

    private void removeQueues(Stream<String> stream) {
        if (isRunning()) {
            synchronized (this.consumersMonitor) {
                checkStartState();
                stream.map(str -> {
                    this.removedQueues.add(str);
                    return (List) this.consumersByQueue.remove(str);
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).flatMap((v0) -> {
                    return v0.stream();
                }).forEach(this::cancelConsumer);
            }
        }
    }

    private void adjustConsumers(int i) {
        synchronized (this.consumersMonitor) {
            checkStartState();
            this.consumersToRestart.clear();
            for (String str : getQueueNames()) {
                while (true) {
                    if (this.consumersByQueue.get(str) == null || ((List) this.consumersByQueue.get(str)).size() < i) {
                        List list = (List) this.consumersByQueue.get(str);
                        int i2 = 0;
                        if (list != null) {
                            List list2 = (List) list.stream().map(simpleConsumer -> {
                                return Integer.valueOf(simpleConsumer.getIndex());
                            }).sorted().collect(Collectors.toList());
                            i2 = 0;
                            while (i2 < list2.size() && i2 >= ((Integer) list2.get(i2)).intValue()) {
                                i2++;
                            }
                        }
                        doConsumeFromQueue(str, i2);
                    }
                }
                reduceConsumersIfIdle(i, str);
            }
        }
    }

    private void reduceConsumersIfIdle(int i, String str) {
        SimpleConsumer simpleConsumer;
        List list = (List) this.consumersByQueue.get(str);
        if (list == null || list.size() <= i) {
            return;
        }
        int size = list.size() - i;
        for (int i2 = 0; i2 < size; i2++) {
            int findIdleConsumer = findIdleConsumer();
            if (findIdleConsumer >= 0 && (simpleConsumer = (SimpleConsumer) list.remove(findIdleConsumer)) != null) {
                cancelConsumer(simpleConsumer);
            }
        }
    }

    protected int findIdleConsumer() {
        return 0;
    }

    private void checkStartState() {
        if (isRunning()) {
            return;
        }
        try {
            Assert.state(this.startedLatch.await(60L, TimeUnit.SECONDS), "Container is not started - cannot adjust queues");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new AmqpException("Interrupted waiting for start", e);
        }
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    protected void doInitialize() {
        if (this.taskScheduler == null) {
            ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            threadPoolTaskScheduler.setThreadNamePrefix(getListenerId() + "-consumerMonitor-");
            threadPoolTaskScheduler.afterPropertiesSet();
            this.taskScheduler = threadPoolTaskScheduler;
        }
        if (this.messagesPerAck > 0) {
            Assert.state(!isChannelTransacted(), "'messagesPerAck' is not allowed with transactions");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void doStart() {
        if (this.started) {
            return;
        }
        actualStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void doStop() {
        super.doStop();
        if (this.taskSchedulerSet || this.taskScheduler == null) {
            return;
        }
        ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
        this.taskScheduler = null;
    }

    protected void actualStart() {
        this.aborted = false;
        this.hasStopped = false;
        if (getPrefetchCount() < this.messagesPerAck) {
            setPrefetchCount(this.messagesPerAck);
        }
        super.doStart();
        String[] queueNames = getQueueNames();
        checkMissingQueues(queueNames);
        checkConnect();
        long idleEventInterval = getIdleEventInterval();
        if (this.taskScheduler == null) {
            afterPropertiesSet();
        }
        if (idleEventInterval > 0 && this.monitorInterval > idleEventInterval) {
            this.monitorInterval = idleEventInterval / 2;
        }
        if (getFailedDeclarationRetryInterval() < this.monitorInterval) {
            this.monitorInterval = getFailedDeclarationRetryInterval();
        }
        Map<String, Queue> queueNamesToQueues = getQueueNamesToQueues();
        this.lastRestartAttempt = System.currentTimeMillis();
        startMonitor(idleEventInterval, queueNamesToQueues);
        if (queueNames.length > 0) {
            doRedeclareElementsIfNecessary();
            getTaskExecutor().execute(() -> {
                startConsumers(queueNames);
            });
        } else {
            this.started = true;
            this.startedLatch.countDown();
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Container initialized for queues: " + Arrays.asList(queueNames));
        }
    }

    protected void checkConnect() {
        if (isPossibleAuthenticationFailureFatal()) {
            AutoCloseable autoCloseable = null;
            try {
                try {
                    getConnectionFactory().createConnection();
                    if (0 != 0) {
                        autoCloseable.close();
                    }
                } catch (AmqpAuthenticationException e) {
                    this.logger.debug("Failed to authenticate", e);
                    throw e;
                } catch (Exception e2) {
                    if (0 != 0) {
                        autoCloseable.close();
                    }
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    autoCloseable.close();
                }
                throw th;
            }
        }
    }

    private void startMonitor(long j, Map<String, Queue> map) {
        this.consumerMonitorTask = this.taskScheduler.scheduleAtFixedRate(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            checkIdle(j, currentTimeMillis);
            checkConsumers(currentTimeMillis);
            if (this.lastRestartAttempt + getFailedDeclarationRetryInterval() < currentTimeMillis) {
                synchronized (this.consumersMonitor) {
                    if (this.started) {
                        ArrayList arrayList = new ArrayList(this.consumersToRestart);
                        this.consumersToRestart.clear();
                        if (arrayList.size() > 0) {
                            doRedeclareElementsIfNecessary();
                        }
                        Iterator<SimpleConsumer> it = arrayList.iterator();
                        while (it.hasNext()) {
                            SimpleConsumer next = it.next();
                            it.remove();
                            if (!this.removedQueues.contains(next.getQueue())) {
                                if (this.logger.isDebugEnabled()) {
                                    this.logger.debug("Attempting to restart consumer " + next);
                                }
                                if (!restartConsumer(map, arrayList, next)) {
                                    break;
                                }
                            } else if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Skipping restart of consumer, queue removed " + next);
                            }
                        }
                        this.lastRestartAttempt = currentTimeMillis;
                    }
                }
            }
            processMonitorTask();
        }, this.monitorInterval);
    }

    private void checkIdle(long j, long j2) {
        if (j <= 0 || j2 - getLastReceive() <= j || j2 - this.lastAlertAt <= j) {
            return;
        }
        publishIdleContainerEvent(j2 - getLastReceive());
        this.lastAlertAt = j2;
    }

    private void checkConsumers(long j) {
        List list;
        synchronized (this.consumersMonitor) {
            list = (List) this.consumers.stream().filter(simpleConsumer -> {
                boolean z = (!simpleConsumer.getChannel().isOpen() || simpleConsumer.isAckFailed() || simpleConsumer.targetChanged()) ? false : true;
                if (z && this.messagesPerAck > 1) {
                    try {
                        simpleConsumer.ackIfNecessary(j);
                    } catch (Exception e) {
                        this.logger.error("Exception while sending delayed ack", e);
                    }
                }
                return !z;
            }).collect(Collectors.toList());
        }
        list.forEach(simpleConsumer2 -> {
            try {
                RabbitUtils.closeMessageConsumer(simpleConsumer2.getChannel(), Collections.singletonList(simpleConsumer2.getConsumerTag()), isChannelTransacted());
            } catch (Exception e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Error closing consumer " + simpleConsumer2, e);
                }
            }
            this.logger.error("Consumer canceled - channel closed " + simpleConsumer2);
            simpleConsumer2.cancelConsumer("Consumer " + simpleConsumer2 + " channel closed");
        });
    }

    private boolean restartConsumer(Map<String, Queue> map, List<SimpleConsumer> list, SimpleConsumer simpleConsumer) {
        SimpleConsumer simpleConsumer2 = simpleConsumer;
        Queue queue = map.get(simpleConsumer2.getQueue());
        if (queue != null && !StringUtils.hasText(queue.getName())) {
            String actualName = queue.getActualName();
            if (StringUtils.hasText(actualName)) {
                map.remove(simpleConsumer2.getQueue());
                map.put(actualName, queue);
                simpleConsumer2 = new SimpleConsumer(null, null, actualName, simpleConsumer2.getIndex());
            }
        }
        try {
            doConsumeFromQueue(simpleConsumer2.getQueue(), simpleConsumer2.getIndex());
            return true;
        } catch (AmqpConnectException | AmqpIOException e) {
            this.logger.error("Cannot connect to server", e);
            if (e.getCause() instanceof AmqpApplicationContextClosedException) {
                this.logger.error("Application context is closed, terminating");
                this.taskScheduler.schedule(this::stop, new Date());
            }
            this.consumersToRestart.addAll(list);
            if (!this.logger.isTraceEnabled()) {
                return false;
            }
            this.logger.trace("After restart exception, consumers to restart now: " + this.consumersToRestart);
            return false;
        }
    }

    private void startConsumers(String[] strArr) {
        synchronized (this.consumersMonitor) {
            if (!this.hasStopped) {
                BackOffExecution start = getRecoveryBackOff().start();
                while (!this.started && isRunning()) {
                    this.cancellationLock.reset();
                    try {
                        for (String str : strArr) {
                            consumeFromQueue(str);
                        }
                        this.started = true;
                        this.startedLatch.countDown();
                    } catch (AmqpConnectException | AmqpIOException e) {
                        long nextBackOff = start.nextBackOff();
                        if (nextBackOff < 0 || (e.getCause() instanceof AmqpApplicationContextClosedException)) {
                            this.aborted = true;
                            shutdown();
                            this.logger.error("Failed to start container - fatal error or backOffs exhausted", e);
                            this.taskScheduler.schedule(this::stop, new Date());
                            break;
                        }
                        this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);
                        doShutdown();
                        try {
                            Thread.sleep(nextBackOff);
                        } catch (InterruptedException e2) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("Consumer start aborted - container stopping");
            }
        }
    }

    protected void doRedeclareElementsIfNecessary() {
        String routingLookupKey = getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.push(getRoutingConnectionFactory(), routingLookupKey);
        }
        try {
            try {
                redeclareElementsIfNecessary();
                if (routingLookupKey != null) {
                    SimpleResourceHolder.pop(getRoutingConnectionFactory());
                }
            } catch (Exception e) {
                this.logger.error("Failed to redeclare elements", e);
                if (routingLookupKey != null) {
                    SimpleResourceHolder.pop(getRoutingConnectionFactory());
                }
            }
        } catch (Throwable th) {
            if (routingLookupKey != null) {
                SimpleResourceHolder.pop(getRoutingConnectionFactory());
            }
            throw th;
        }
    }

    protected void processMonitorTask() {
    }

    private void checkMissingQueues(String[] strArr) {
        if (isMissingQueuesFatal()) {
            AmqpAdmin amqpAdmin = getAmqpAdmin();
            if (amqpAdmin == null) {
                try {
                    amqpAdmin = (AmqpAdmin) ClassUtils.forName("org.springframework.amqp.rabbit.core.RabbitAdmin", ClassUtils.getDefaultClassLoader()).getConstructor(ConnectionFactory.class).newInstance(getConnectionFactory());
                    setAmqpAdmin(amqpAdmin);
                } catch (Exception e) {
                    this.logger.error("Failed to create a RabbitAdmin", e);
                }
            }
            if (amqpAdmin != null) {
                for (String str : strArr) {
                    if (amqpAdmin.getQueueProperties(str) == null && isMissingQueuesFatal()) {
                        throw new IllegalStateException("At least one of the configured queues is missing");
                    }
                }
            }
        }
    }

    private void consumeFromQueue(String str) {
        if (CollectionUtils.isEmpty((List) this.consumersByQueue.get(str))) {
            for (int i = 0; i < this.consumersPerQueue; i++) {
                doConsumeFromQueue(str, i);
            }
        }
    }

    private void doConsumeFromQueue(String str, int i) {
        if (!isActive()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Consume from queue " + str + " ignore, container stopping");
                return;
            }
            return;
        }
        String routingLookupKey = getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.push(getRoutingConnectionFactory(), routingLookupKey);
        }
        try {
            try {
                Connection createConnection = getConnectionFactory().createConnection();
                if (routingLookupKey != null) {
                    SimpleResourceHolder.pop(getRoutingConnectionFactory());
                }
                SimpleConsumer consume = consume(str, i, createConnection);
                synchronized (this.consumersMonitor) {
                    if (consume != null) {
                        this.cancellationLock.add(consume);
                        this.consumers.add(consume);
                        this.consumersByQueue.add(str, consume);
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info(consume + " started");
                        }
                        if (getApplicationEventPublisher() != null) {
                            getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consume));
                        }
                    }
                }
            } catch (Exception e) {
                publishConsumerFailedEvent(e.getMessage(), false, e);
                addConsumerToRestart(new SimpleConsumer(null, null, str, i));
                if (!(e instanceof AmqpConnectException)) {
                    throw new AmqpConnectException(e);
                }
            }
        } catch (Throwable th) {
            if (routingLookupKey != null) {
                SimpleResourceHolder.pop(getRoutingConnectionFactory());
            }
            throw th;
        }
    }

    @Nullable
    private SimpleConsumer consume(String str, int i, Connection connection) {
        Channel channel = null;
        SimpleConsumer simpleConsumer = null;
        try {
            if (getConsumeDelay() > 0) {
                try {
                    Thread.sleep(getConsumeDelay());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            channel = connection.createChannel(isChannelTransacted());
            channel.basicQos(getPrefetchCount(), isGlobalQos());
            simpleConsumer = new SimpleConsumer(connection, channel, str, i);
            channel.queueDeclarePassive(str);
            simpleConsumer.consumerTag = channel.basicConsume(str, getAcknowledgeMode().isAutoAck(), getConsumerTagStrategy() != null ? getConsumerTagStrategy().createConsumerTag(str) : "", isNoLocal(), isExclusive(), getConsumerArguments(), simpleConsumer);
        } catch (AmqpApplicationContextClosedException e2) {
            throw new AmqpConnectException(e2);
        } catch (Exception e3) {
            RabbitUtils.closeChannel(channel);
            RabbitUtils.closeConnection(connection);
            simpleConsumer = handleConsumeException(str, i, simpleConsumer, e3);
        }
        return simpleConsumer;
    }

    @Nullable
    private SimpleConsumer handleConsumeException(String str, int i, @Nullable SimpleConsumer simpleConsumer, Exception exc) {
        SimpleConsumer simpleConsumer2 = simpleConsumer;
        if ((exc.getCause() instanceof ShutdownSignalException) && exc.getCause().getMessage().contains("in exclusive use")) {
            getExclusiveConsumerExceptionLogger().log(this.logger, "Exclusive consumer failure", exc.getCause());
            publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, exc);
        } else if ((exc.getCause() instanceof ShutdownSignalException) && RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) exc.getCause())) {
            publishMissingQueueEvent(str);
            this.logger.error("Queue not present, scheduling consumer " + (simpleConsumer2 == null ? "for queue " + str : simpleConsumer2) + " for restart", exc);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn("basicConsume failed, scheduling consumer " + (simpleConsumer2 == null ? "for queue " + str : simpleConsumer2) + " for restart", exc);
        }
        if (simpleConsumer2 == null) {
            addConsumerToRestart(new SimpleConsumer(null, null, str, i));
        } else {
            addConsumerToRestart(simpleConsumer2);
            simpleConsumer2 = null;
        }
        return simpleConsumer2;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    protected void doShutdown() {
        LinkedList linkedList = null;
        boolean z = false;
        synchronized (this.consumersMonitor) {
            if (this.started || this.aborted) {
                linkedList = new LinkedList(this.consumers);
                actualShutDown(linkedList);
                z = true;
            }
        }
        try {
            if (z) {
                try {
                    if (this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
                        this.logger.info("Successfully waited for consumers to finish.");
                    } else {
                        this.logger.info("Consumers not finished.");
                        if (isForceCloseChannel()) {
                            linkedList.forEach(simpleConsumer -> {
                                String str = "Closing channel for unresponsive consumer: " + simpleConsumer;
                                if (this.logger.isWarnEnabled()) {
                                    this.logger.warn(str);
                                }
                                simpleConsumer.cancelConsumer(str);
                            });
                        }
                    }
                    this.startedLatch = new CountDownLatch(1);
                    this.started = false;
                    this.aborted = false;
                    this.hasStopped = true;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.logger.warn("Interrupted waiting for consumers. Continuing with shutdown.");
                    this.startedLatch = new CountDownLatch(1);
                    this.started = false;
                    this.aborted = false;
                    this.hasStopped = true;
                }
            }
        } catch (Throwable th) {
            this.startedLatch = new CountDownLatch(1);
            this.started = false;
            this.aborted = false;
            this.hasStopped = true;
            throw th;
        }
    }

    private void actualShutDown(List<SimpleConsumer> list) {
        Assert.state(getTaskExecutor() != null, "Cannot shut down if not initialized");
        this.logger.debug("Shutting down");
        list.forEach(this::cancelConsumer);
        this.consumers.clear();
        this.consumersByQueue.clear();
        this.logger.debug("All consumers canceled");
        if (this.consumerMonitorTask != null) {
            this.consumerMonitorTask.cancel(true);
            this.consumerMonitorTask = null;
        }
    }

    private void cancelConsumer(SimpleConsumer simpleConsumer) {
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Canceling " + simpleConsumer);
            }
            synchronized (simpleConsumer) {
                simpleConsumer.setCanceled(true);
                if (this.messagesPerAck > 1) {
                    try {
                        simpleConsumer.ackIfNecessary(0L);
                    } catch (Exception e) {
                        this.logger.error("Exception while sending delayed ack", e);
                    }
                }
            }
            RabbitUtils.cancel(simpleConsumer.getChannel(), simpleConsumer.getConsumerTag());
            this.consumers.remove(simpleConsumer);
            consumerRemoved(simpleConsumer);
        } catch (Throwable th) {
            this.consumers.remove(simpleConsumer);
            consumerRemoved(simpleConsumer);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addConsumerToRestart(SimpleConsumer simpleConsumer) {
        this.consumersToRestart.add(simpleConsumer);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Consumers to restart now: " + this.consumersToRestart);
        }
    }

    protected void consumerRemoved(SimpleConsumer simpleConsumer) {
    }
}
