/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.aws.messaging.listener;

import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.springframework.cloud.aws.messaging.core.QueueMessageUtils;
import org.springframework.cloud.aws.messaging.listener.AbstractMessageListenerContainer;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ClassUtils;

public class SimpleMessageListenerContainer
extends AbstractMessageListenerContainer {
    private static final int DEFAULT_WORKER_THREADS = 2;
    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(SimpleMessageListenerContainer.class) + "-";
    private TaskExecutor taskExecutor;
    private volatile CountDownLatch stopLatch;
    private boolean defaultTaskExecutor;
    private boolean deleteMessageOnException = true;

    protected TaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public boolean isDeleteMessageOnException() {
        return this.deleteMessageOnException;
    }

    public void setDeleteMessageOnException(boolean deleteMessageOnException) {
        this.deleteMessageOnException = deleteMessageOnException;
    }

    @Override
    protected void initialize() {
        if (this.taskExecutor == null) {
            this.defaultTaskExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor();
        }
        super.initialize();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doStart() {
        Object object = this.getLifecycleMonitor();
        synchronized (object) {
            this.scheduleMessageListeners();
        }
    }

    @Override
    protected void doStop() {
        try {
            this.stopLatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    protected void doDestroy() {
        if (this.defaultTaskExecutor) {
            ((ThreadPoolTaskExecutor)this.taskExecutor).destroy();
        }
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        String beanName = this.getBeanName();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
        int spinningThreads = this.getRegisteredQueues().size();
        if (spinningThreads > 0) {
            threadPoolTaskExecutor.setCorePoolSize(spinningThreads * 2);
            int maxNumberOfMessagePerBatch = this.getMaxNumberOfMessages() != null ? this.getMaxNumberOfMessages() : 2;
            threadPoolTaskExecutor.setMaxPoolSize(spinningThreads * maxNumberOfMessagePerBatch);
        }
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    private void scheduleMessageListeners() {
        this.stopLatch = new CountDownLatch(this.getRegisteredQueues().size());
        for (Map.Entry<String, AbstractMessageListenerContainer.QueueAttributes> messageRequest : this.getRegisteredQueues().entrySet()) {
            this.getTaskExecutor().execute((Runnable)new SignalExecutingRunnable(this.stopLatch, new AsynchronousMessageListener(messageRequest.getKey(), messageRequest.getValue())));
        }
    }

    protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) {
        this.getMessageHandler().handleMessage(stringMessage);
    }

    private static class SignalExecutingRunnable
    implements Runnable {
        private final CountDownLatch countDownLatch;
        private final Runnable runnable;

        private SignalExecutingRunnable(CountDownLatch endSignal, Runnable runnable) {
            this.countDownLatch = endSignal;
            this.runnable = runnable;
        }

        @Override
        public void run() {
            this.runnable.run();
            this.countDownLatch.countDown();
        }
    }

    private class MessageExecutor
    implements Runnable {
        private final Message message;
        private final String logicalQueueName;
        private final String queueUrl;
        private final boolean hasRedrivePolicy;

        private MessageExecutor(String logicalQueueName, Message message, AbstractMessageListenerContainer.QueueAttributes queueAttributes) {
            this.logicalQueueName = logicalQueueName;
            this.message = message;
            this.queueUrl = queueAttributes.getReceiveMessageRequest().getQueueUrl();
            this.hasRedrivePolicy = queueAttributes.hasRedrivePolicy();
        }

        @Override
        public void run() {
            block2: {
                String receiptHandle = this.message.getReceiptHandle();
                org.springframework.messaging.Message<String> queueMessage = QueueMessageUtils.createMessage(this.message, Collections.singletonMap("LogicalResourceId", this.logicalQueueName));
                try {
                    SimpleMessageListenerContainer.this.executeMessage(queueMessage);
                    SimpleMessageListenerContainer.this.getAmazonSqs().deleteMessageAsync(new DeleteMessageRequest(this.queueUrl, receiptHandle));
                }
                catch (MessagingException e) {
                    if (this.hasRedrivePolicy || !SimpleMessageListenerContainer.this.isDeleteMessageOnException()) break block2;
                    SimpleMessageListenerContainer.this.getAmazonSqs().deleteMessageAsync(new DeleteMessageRequest(this.queueUrl, receiptHandle));
                }
            }
        }
    }

    private class AsynchronousMessageListener
    implements Runnable {
        private final AbstractMessageListenerContainer.QueueAttributes queueAttributes;
        private final String logicalQueueName;

        private AsynchronousMessageListener(String logicalQueueName, AbstractMessageListenerContainer.QueueAttributes queueAttributes) {
            this.logicalQueueName = logicalQueueName;
            this.queueAttributes = queueAttributes;
        }

        @Override
        public void run() {
            while (SimpleMessageListenerContainer.this.isRunning()) {
                ReceiveMessageResult receiveMessageResult = SimpleMessageListenerContainer.this.getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
                CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
                for (Message message : receiveMessageResult.getMessages()) {
                    if (!SimpleMessageListenerContainer.this.isRunning()) break;
                    MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                    SimpleMessageListenerContainer.this.getTaskExecutor().execute((Runnable)new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
                }
                try {
                    messageBatchLatch.await();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

