package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.listener.adapter.ListenerExecutionFailedException;
import org.springframework.aop.Pointcut;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.class */
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
    public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
    private static final int DEFAULT_PREFETCH_COUNT = 1;
    private static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;
    private volatile int prefetchCount;
    private volatile int txSize;
    private volatile Executor taskExecutor;
    private volatile int concurrentConsumers;
    private long receiveTimeout;
    private long shutdownTimeout;
    private Set<BlockingQueueConsumer> consumers;
    private final Object consumersMonitor;
    private PlatformTransactionManager transactionManager;
    private TransactionAttribute transactionAttribute;
    private CountDownLatch cancellationLock;
    private Advice[] advices;
    private ContainerDelegate delegate;
    private ContainerDelegate proxy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.class */
    public class AsyncMessageProcessingConsumer implements Runnable {
        private final BlockingQueueConsumer consumer;
        private final CountDownLatch latch;

        public AsyncMessageProcessingConsumer(BlockingQueueConsumer blockingQueueConsumer, CountDownLatch countDownLatch) {
            this.consumer = blockingQueueConsumer;
            this.latch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.consumer.start();
                    boolean z = false;
                    while (true) {
                        if (!SimpleMessageListenerContainer.this.isActive() && !z) {
                            break;
                        } else {
                            try {
                                z = SimpleMessageListenerContainer.this.proxy.receiveAndExecute(this.consumer) && !SimpleMessageListenerContainer.this.isChannelTransacted();
                            } catch (ListenerExecutionFailedException e) {
                            }
                        }
                    }
                    if (SimpleMessageListenerContainer.this.isActive()) {
                        SimpleMessageListenerContainer.this.logger.debug("Restarting " + this.consumer);
                        SimpleMessageListenerContainer.this.restart(this.consumer);
                    } else {
                        SimpleMessageListenerContainer.this.logger.debug("Cancelling " + this.consumer);
                        this.latch.countDown();
                        this.consumer.stop();
                    }
                } catch (InterruptedException e2) {
                    SimpleMessageListenerContainer.this.logger.debug("Consumer thread interrupted, processing stopped.");
                    Thread.currentThread().interrupt();
                    if (SimpleMessageListenerContainer.this.isActive()) {
                        SimpleMessageListenerContainer.this.logger.debug("Restarting " + this.consumer);
                        SimpleMessageListenerContainer.this.restart(this.consumer);
                    } else {
                        SimpleMessageListenerContainer.this.logger.debug("Cancelling " + this.consumer);
                        this.latch.countDown();
                        this.consumer.stop();
                    }
                } catch (Throwable th) {
                    SimpleMessageListenerContainer.this.logger.debug("Consumer received fatal exception, processing stopped.", th);
                    if (SimpleMessageListenerContainer.this.isActive()) {
                        SimpleMessageListenerContainer.this.logger.debug("Restarting " + this.consumer);
                        SimpleMessageListenerContainer.this.restart(this.consumer);
                    } else {
                        SimpleMessageListenerContainer.this.logger.debug("Cancelling " + this.consumer);
                        this.latch.countDown();
                        this.consumer.stop();
                    }
                }
            } catch (Throwable th2) {
                if (SimpleMessageListenerContainer.this.isActive()) {
                    SimpleMessageListenerContainer.this.logger.debug("Restarting " + this.consumer);
                    SimpleMessageListenerContainer.this.restart(this.consumer);
                } else {
                    SimpleMessageListenerContainer.this.logger.debug("Cancelling " + this.consumer);
                    this.latch.countDown();
                    this.consumer.stop();
                }
                throw th2;
            }
        }
    }

    /* loaded from: input_file:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer$ContainerDelegate.class */
    public interface ContainerDelegate {
        boolean receiveAndExecute(BlockingQueueConsumer blockingQueueConsumer) throws Throwable;
    }

    public void setAdviceChain(Advice[] adviceArr) {
        this.advices = adviceArr;
    }

    public SimpleMessageListenerContainer() {
        this.prefetchCount = DEFAULT_PREFETCH_COUNT;
        this.txSize = DEFAULT_PREFETCH_COUNT;
        this.taskExecutor = new SimpleAsyncTaskExecutor();
        this.concurrentConsumers = 0;
        this.receiveTimeout = 1000L;
        this.shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
        this.consumersMonitor = new Object();
        this.transactionAttribute = new DefaultTransactionAttribute();
        this.advices = new Advice[0];
        this.delegate = new ContainerDelegate() { // from class: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.1
            @Override // org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.ContainerDelegate
            public boolean receiveAndExecute(BlockingQueueConsumer blockingQueueConsumer) throws Throwable {
                return SimpleMessageListenerContainer.this.receiveAndExecute(blockingQueueConsumer);
            }
        };
        this.proxy = this.delegate;
    }

    public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        this.prefetchCount = DEFAULT_PREFETCH_COUNT;
        this.txSize = DEFAULT_PREFETCH_COUNT;
        this.taskExecutor = new SimpleAsyncTaskExecutor();
        this.concurrentConsumers = 0;
        this.receiveTimeout = 1000L;
        this.shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
        this.consumersMonitor = new Object();
        this.transactionAttribute = new DefaultTransactionAttribute();
        this.advices = new Advice[0];
        this.delegate = new ContainerDelegate() { // from class: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.1
            @Override // org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.ContainerDelegate
            public boolean receiveAndExecute(BlockingQueueConsumer blockingQueueConsumer) throws Throwable {
                return SimpleMessageListenerContainer.this.receiveAndExecute(blockingQueueConsumer);
            }
        };
        this.proxy = this.delegate;
        setConnectionFactory(connectionFactory);
    }

    public void setConcurrentConsumers(int i) {
        Assert.isTrue(i > 0, "'concurrentConsumers' value must be at least 1 (one)");
        this.concurrentConsumers = i;
    }

    public void setReceiveTimeout(long j) {
        this.receiveTimeout = j;
    }

    public void setShutdownTimeout(long j) {
        this.shutdownTimeout = j;
    }

    public void setTaskExecutor(Executor executor) {
        Assert.notNull(executor, "taskExecutor must not be null");
        this.taskExecutor = executor;
    }

    public void setPrefetchCount(int i) {
        this.prefetchCount = i;
    }

    public void setTxSize(int i) {
        this.txSize = i;
    }

    public void setTransactionManager(PlatformTransactionManager platformTransactionManager) {
        this.transactionManager = platformTransactionManager;
    }

    public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
        this.transactionAttribute = transactionAttribute;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void validateConfiguration() {
        super.validateConfiguration();
        Assert.state(!getAcknowledgeMode().isAutoAck() || this.transactionManager == null, "The acknowledgeMode is NONE (autoack in Rabbit terms) which is not consistent with having an external transaction manager. Either use a different AcknowledgeMode or make sure the transactionManager is null.");
        if (getConnectionFactory() instanceof CachingConnectionFactory) {
            CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) getConnectionFactory();
            if (cachingConnectionFactory.getChannelCacheSize() < this.concurrentConsumers) {
                throw new IllegalStateException("CachingConnectionFactory's channelCacheSize can not be less than the number of concurrentConsumers");
            }
            if (this.concurrentConsumers < DEFAULT_PREFETCH_COUNT) {
                this.concurrentConsumers = DEFAULT_PREFETCH_COUNT;
                if (cachingConnectionFactory.getChannelCacheSize() > DEFAULT_PREFETCH_COUNT) {
                    this.logger.info("Setting number of concurrent consumers to CachingConnectionFactory's ChannelCacheSize [" + cachingConnectionFactory.getChannelCacheSize() + "]");
                    this.concurrentConsumers = cachingConnectionFactory.getChannelCacheSize();
                }
            }
        }
        if (this.concurrentConsumers < DEFAULT_PREFETCH_COUNT) {
            this.concurrentConsumers = DEFAULT_PREFETCH_COUNT;
        }
    }

    public void initializeProxy() {
        if (this.advices.length == 0 && this.transactionManager == null) {
            return;
        }
        ProxyFactory proxyFactory = new ProxyFactory();
        if (this.transactionManager != null) {
            MatchAlwaysTransactionAttributeSource matchAlwaysTransactionAttributeSource = new MatchAlwaysTransactionAttributeSource();
            matchAlwaysTransactionAttributeSource.setTransactionAttribute(this.transactionAttribute);
            proxyFactory.addAdvisor(new DefaultPointcutAdvisor(Pointcut.TRUE, new TransactionInterceptor(this.transactionManager, matchAlwaysTransactionAttributeSource)));
        }
        Advice[] adviceArr = this.advices;
        int length = adviceArr.length;
        for (int i = 0; i < length; i += DEFAULT_PREFETCH_COUNT) {
            proxyFactory.addAdvisor(new DefaultPointcutAdvisor(Pointcut.TRUE, adviceArr[i]));
        }
        proxyFactory.setProxyTargetClass(false);
        proxyFactory.addInterface(ContainerDelegate.class);
        proxyFactory.setTarget(this.delegate);
        this.proxy = (ContainerDelegate) proxyFactory.getProxy();
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    protected final boolean sharedConnectionEnabled() {
        return true;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    protected void doInitialize() throws Exception {
        initializeProxy();
    }

    public int getActiveConsumerCount() {
        return (int) this.cancellationLock.getCount();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable, java.lang.Object] */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void doStart() throws Exception {
        super.doStart();
        establishSharedConnection();
        initializeConsumers();
        synchronized (this.consumersMonitor) {
            if (this.consumers == null) {
                this.logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
                return;
            }
            this.cancellationLock = new CountDownLatch(this.consumers.size());
            Iterator<BlockingQueueConsumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                this.taskExecutor.execute(new AsyncMessageProcessingConsumer(it.next(), this.cancellationLock));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void doStop() {
        shutdown();
        super.doStop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9 */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    protected void doShutdown() {
        if (isRunning()) {
            try {
                this.logger.debug("Waiting for workers to finish.");
                if (this.cancellationLock.await(this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
                    this.logger.info("Successfully waited for workers to finish.");
                } else {
                    this.logger.info("Workers not finished.  Forcing connections to close.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.warn("Interrupted waiting for workers.  Continuing with shutdown.");
            }
            ?? r0 = this.consumersMonitor;
            synchronized (r0) {
                this.consumers = null;
                r0 = r0;
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    protected void initializeConsumers() throws IOException {
        ?? r0 = this.consumersMonitor;
        synchronized (r0) {
            if (this.consumers == null) {
                this.consumers = new HashSet(this.concurrentConsumers);
                for (int i = 0; i < this.concurrentConsumers; i += DEFAULT_PREFETCH_COUNT) {
                    this.consumers.add(createBlockingQueueConsumer(getTransactionalResourceHolder().getChannel()));
                }
            }
            r0 = r0;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public boolean isChannelLocallyTransacted(Channel channel) {
        return super.isChannelLocallyTransacted(channel) && this.transactionManager == null;
    }

    protected BlockingQueueConsumer createBlockingQueueConsumer(Channel channel) {
        return new BlockingQueueConsumer(channel, getAcknowledgeMode(), isChannelTransacted(), this.prefetchCount, StringUtils.commaDelimitedListToStringArray(getRequiredQueueName()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v21, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.util.Set<org.springframework.amqp.rabbit.listener.BlockingQueueConsumer>] */
    public void restart(BlockingQueueConsumer blockingQueueConsumer) {
        ?? r0 = this.consumersMonitor;
        synchronized (r0) {
            r0 = this.consumers;
            if (r0 != 0) {
                try {
                    blockingQueueConsumer.stop();
                    this.consumers.remove(blockingQueueConsumer);
                    BlockingQueueConsumer createBlockingQueueConsumer = createBlockingQueueConsumer(getTransactionalResourceHolder().getChannel());
                    r0 = this.consumers.add(createBlockingQueueConsumer);
                    this.taskExecutor.execute(new AsyncMessageProcessingConsumer(createBlockingQueueConsumer, this.cancellationLock));
                } catch (RuntimeException e) {
                    this.logger.warn("Consumer died on restart. " + e.getClass() + ": " + e.getMessage());
                    this.cancellationLock.countDown();
                    throw e;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean receiveAndExecute(BlockingQueueConsumer blockingQueueConsumer) throws Throwable {
        Channel channel = blockingQueueConsumer.getChannel();
        int i = 0;
        ConnectionFactory connectionFactory = getConnectionFactory();
        if (getAcknowledgeMode().isTransactionAllowed()) {
            ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(channel), connectionFactory, true);
        }
        for (int i2 = 0; i2 < this.txSize; i2 += DEFAULT_PREFETCH_COUNT) {
            this.logger.debug("Waiting for message from consumer.");
            Message nextMessage = blockingQueueConsumer.nextMessage(this.receiveTimeout);
            if (nextMessage == null) {
                return false;
            }
            i += DEFAULT_PREFETCH_COUNT;
            executeListener(channel, nextMessage);
        }
        return true;
    }
}
