package de.enterprise.starters.aws.ecs.sqs;

import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import de.enterprise.spring.boot.application.starter.tracing.TracingProperties;
import de.enterprise.spring.boot.application.starter.tracing.TracingUtils;
import de.enterprise.starters.aws.ecs.sqs.SqsProperties;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.springframework.cloud.aws.messaging.core.QueueMessageUtils;
import org.springframework.cloud.aws.messaging.listener.AbstractMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.QueueMessageAcknowledgment;
import org.springframework.cloud.aws.messaging.listener.QueueMessageVisibility;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.Assert;

/* loaded from: input_file:de/enterprise/starters/aws/ecs/sqs/ResilientMessageListenerContainer.class */
public class ResilientMessageListenerContainer extends SimpleMessageListenerContainer {
    private static final String CIRCUITBREAKER_NAME_PREFIX = "sqsListener";
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    private final SqsProperties sqsDefaultProperties;
    private final TracingProperties tracingProperties;
    private final List<MessageListenerContainerEventListener> eventListeners;
    static final String LOGICAL_RESOURCE_ID = "LogicalResourceId";
    static final String ACKNOWLEDGMENT = "Acknowledgment";
    static final String VISIBILITY = "Visibility";
    private ConcurrentHashMap<String, Future<?>> scheduledFutureByQueue;
    private ConcurrentHashMap<String, Boolean> runningStateByQueue;
    private final Map<String, String> externalToInternalQueueName = new HashMap();
    private long queueStopTimeout = 10000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/enterprise/starters/aws/ecs/sqs/ResilientMessageListenerContainer$AsynchronousMessageListener.class */
    public final class AsynchronousMessageListener implements Runnable {
        private final AbstractMessageListenerContainer.QueueAttributes queueAttributes;
        private final String logicalQueueName;
        private final String internalQueueName;
        private final CircuitBreaker circuitBreaker;

        private AsynchronousMessageListener(String str, AbstractMessageListenerContainer.QueueAttributes queueAttributes, CircuitBreaker circuitBreaker) {
            this.logicalQueueName = str;
            this.queueAttributes = queueAttributes;
            this.circuitBreaker = circuitBreaker;
            this.internalQueueName = (String) ResilientMessageListenerContainer.this.externalToInternalQueueName.get(this.logicalQueueName);
        }

        @Override // java.lang.Runnable
        public void run() {
            ReceiveMessageRequest receiveMessageRequest;
            while (isQueueRunning()) {
                try {
                    try {
                        receiveMessageRequest = this.queueAttributes.getReceiveMessageRequest();
                    } catch (Exception e) {
                        ResilientMessageListenerContainer.this.getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be retried in {} milliseconds", new Object[]{this.logicalQueueName, Long.valueOf(ResilientMessageListenerContainer.this.getBackOffTime()), e});
                        ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener -> {
                            messageListenerContainerEventListener.onSqsRequestFailure(this.internalQueueName, e);
                        });
                        Thread.sleep(ResilientMessageListenerContainer.this.getBackOffTime());
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
                if (this.circuitBreaker.tryAcquirePermission()) {
                    ResilientMessageListenerContainer.this.getLogger().trace("About to request SQS with queueName={} maxNumberOfMessages={}, circuitBreakerName={}, circuitBreakerState={}", new Object[]{this.logicalQueueName, receiveMessageRequest.getMaxNumberOfMessages(), this.circuitBreaker.getName(), this.circuitBreaker.getState()});
                    ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener2 -> {
                        messageListenerContainerEventListener2.onSqsRequestAttempt(this.internalQueueName);
                    });
                    ReceiveMessageResult receiveMessage = ResilientMessageListenerContainer.this.getAmazonSqs().receiveMessage(receiveMessageRequest);
                    int size = receiveMessage.getMessages().size();
                    ResilientMessageListenerContainer.this.getLogger().trace("Received {} messages from queueName={}", Integer.valueOf(size), this.logicalQueueName);
                    ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener3 -> {
                        messageListenerContainerEventListener3.onSqsRequestSuccess(this.internalQueueName, size);
                    });
                    CountDownLatch countDownLatch = new CountDownLatch(size);
                    for (Message message : receiveMessage.getMessages()) {
                        if (isQueueRunning()) {
                            ResilientMessageListenerContainer.this.getTaskExecutor().execute(new SignalExecutingRunnable(countDownLatch, new MessageExecutor(this.logicalQueueName, message, this.queueAttributes, this.circuitBreaker)));
                        } else {
                            countDownLatch.countDown();
                        }
                    }
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    try {
                        ResilientMessageListenerContainer.this.getLogger().trace("sleep for {} ms as calls are permitted by circuitBreakerName={} queueName={}", new Object[]{1000, this.circuitBreaker.getName(), this.logicalQueueName});
                        ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener4 -> {
                            messageListenerContainerEventListener4.onSqsRequestRejection(this.internalQueueName);
                        });
                        Thread.sleep(1000);
                    } catch (InterruptedException e4) {
                        Thread.currentThread().interrupt();
                    }
                }
                ResilientMessageListenerContainer.this.getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be retried in {} milliseconds", new Object[]{this.logicalQueueName, Long.valueOf(ResilientMessageListenerContainer.this.getBackOffTime()), e});
                ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener5 -> {
                    messageListenerContainerEventListener5.onSqsRequestFailure(this.internalQueueName, e);
                });
                Thread.sleep(ResilientMessageListenerContainer.this.getBackOffTime());
            }
            ResilientMessageListenerContainer.this.scheduledFutureByQueue.remove(this.logicalQueueName);
        }

        private boolean isQueueRunning() {
            if (ResilientMessageListenerContainer.this.runningStateByQueue.containsKey(this.logicalQueueName)) {
                return ((Boolean) ResilientMessageListenerContainer.this.runningStateByQueue.get(this.logicalQueueName)).booleanValue();
            }
            ResilientMessageListenerContainer.this.getLogger().warn("Stopped queue '" + this.logicalQueueName + "' because it was not listed as running queue.");
            return false;
        }
    }

    /* loaded from: input_file:de/enterprise/starters/aws/ecs/sqs/ResilientMessageListenerContainer$MessageExecutor.class */
    private final class MessageExecutor implements Runnable {
        private final Message message;
        private final String logicalQueueName;
        private final String queueUrl;
        private final boolean hasRedrivePolicy;
        private final SqsMessageDeletionPolicy deletionPolicy;
        private final CircuitBreaker circuitBreaker;
        private final SqsProperties.QueueProperties queueProperties;
        private final String internalQueueName;
        private final TracingProperties tracingProperties;

        private MessageExecutor(String str, Message message, AbstractMessageListenerContainer.QueueAttributes queueAttributes, CircuitBreaker circuitBreaker) {
            this.logicalQueueName = str;
            this.message = message;
            this.queueUrl = queueAttributes.getReceiveMessageRequest().getQueueUrl();
            this.hasRedrivePolicy = queueAttributes.hasRedrivePolicy();
            this.deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS;
            this.circuitBreaker = circuitBreaker;
            this.internalQueueName = (String) ResilientMessageListenerContainer.this.externalToInternalQueueName.get(this.logicalQueueName);
            this.queueProperties = ResilientMessageListenerContainer.this.sqsDefaultProperties.getQueues().get(this.internalQueueName);
            this.tracingProperties = ResilientMessageListenerContainer.this.tracingProperties;
        }

        @Override // java.lang.Runnable
        public void run() {
            String receiptHandle = this.message.getReceiptHandle();
            if (this.queueProperties.getDeleteMessagesOnMaxAgeEnabled().booleanValue() && maxAgeReached()) {
                applyDeletionPolicyOnSuccess(receiptHandle);
                ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener -> {
                    messageListenerContainerEventListener.onMessageDeletionMaxAgeReached(this.internalQueueName);
                });
                return;
            }
            org.springframework.messaging.Message<String> messageForExecution = getMessageForExecution();
            try {
                try {
                    TracingUtils.addNewMdcTraceContext(this.tracingProperties);
                    if (this.circuitBreaker.tryAcquirePermission()) {
                        ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener2 -> {
                            messageListenerContainerEventListener2.onMessageProcessingAttempt(this.internalQueueName);
                        });
                        this.circuitBreaker.executeRunnable(() -> {
                            ResilientMessageListenerContainer.this.executeMessage(messageForExecution);
                        });
                        applyDeletionPolicyOnSuccess(receiptHandle);
                        ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener3 -> {
                            messageListenerContainerEventListener3.onMessageProcessingSuccess(this.internalQueueName);
                        });
                    } else {
                        ResilientMessageListenerContainer.this.getLogger().debug("skipping processing of received sqs-message as circuitbreaker allows no more executions queueName={}, circuitBreakerName={}", this.logicalQueueName, this.circuitBreaker.getName());
                        ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener4 -> {
                            messageListenerContainerEventListener4.onMessageProcessingRejection(this.internalQueueName);
                        });
                    }
                    TracingUtils.removeMdcTraceContext(this.tracingProperties);
                } catch (MessagingException e) {
                    ResilientMessageListenerContainer.this.getLogger().debug("exception while processing message for queueName={}, circuitBreakerName={}", this.logicalQueueName, this.circuitBreaker.getName());
                    ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener5 -> {
                        messageListenerContainerEventListener5.onMessageProcessingFailure(this.internalQueueName);
                    });
                    if (e.getCause() instanceof MessageConversionException) {
                        ResilientMessageListenerContainer.this.getLogger().error("Exception while converting message. Message is being deleted from the queue.", e);
                        deleteMessage(receiptHandle);
                        ResilientMessageListenerContainer.this.notifyEventListeners(messageListenerContainerEventListener6 -> {
                            messageListenerContainerEventListener6.onMessageDeletionConversionError(this.internalQueueName, (MessageConversionException) e.getCause());
                        });
                    } else {
                        applyDeletionPolicyOnError(receiptHandle, e);
                    }
                    TracingUtils.removeMdcTraceContext(this.tracingProperties);
                }
            } catch (Throwable th) {
                TracingUtils.removeMdcTraceContext(this.tracingProperties);
                throw th;
            }
        }

        private void applyDeletionPolicyOnSuccess(String str) {
            if (this.deletionPolicy == SqsMessageDeletionPolicy.ON_SUCCESS || this.deletionPolicy == SqsMessageDeletionPolicy.ALWAYS || this.deletionPolicy == SqsMessageDeletionPolicy.NO_REDRIVE) {
                deleteMessage(str);
            }
        }

        private void applyDeletionPolicyOnError(String str, Exception exc) {
            if (this.deletionPolicy == SqsMessageDeletionPolicy.ALWAYS || (this.deletionPolicy == SqsMessageDeletionPolicy.NO_REDRIVE && !this.hasRedrivePolicy)) {
                deleteMessage(str);
            } else if (this.deletionPolicy == SqsMessageDeletionPolicy.ON_SUCCESS) {
                ResilientMessageListenerContainer.this.getLogger().warn("Exception encountered while processing message. Message is not being deleted from the queue.", exc);
            }
        }

        private void deleteMessage(String str) {
            ResilientMessageListenerContainer.this.getAmazonSqs().deleteMessageAsync(new DeleteMessageRequest(this.queueUrl, str));
        }

        private org.springframework.messaging.Message<String> getMessageForExecution() {
            HashMap hashMap = new HashMap();
            hashMap.putIfAbsent("contentType", "application/json");
            hashMap.put(ResilientMessageListenerContainer.LOGICAL_RESOURCE_ID, this.logicalQueueName);
            if (this.deletionPolicy == SqsMessageDeletionPolicy.NEVER) {
                hashMap.put(ResilientMessageListenerContainer.ACKNOWLEDGMENT, new QueueMessageAcknowledgment(ResilientMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, this.message.getReceiptHandle()));
            }
            hashMap.put(ResilientMessageListenerContainer.VISIBILITY, new QueueMessageVisibility(ResilientMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, this.message.getReceiptHandle()));
            return QueueMessageUtils.createMessage(this.message, hashMap);
        }

        private boolean maxAgeReached() {
            String str = (String) this.message.getAttributes().get("SentTimestamp");
            if (str == null) {
                return false;
            }
            Instant ofEpochMilli = Instant.ofEpochMilli(Long.parseLong(str));
            if (!ofEpochMilli.plus((TemporalAmount) this.queueProperties.getMaxAge()).isBefore(Instant.now())) {
                return false;
            }
            ResilientMessageListenerContainer.this.getLogger().error("Sqs message has reached max age - queueName={}, UTC-sentDateTime={}, body={}, attributes={}", new Object[]{this.logicalQueueName, ofEpochMilli.atZone(ZoneOffset.UTC).toLocalDateTime(), this.message.getBody(), this.message.getAttributes()});
            return true;
        }
    }

    /* loaded from: input_file:de/enterprise/starters/aws/ecs/sqs/ResilientMessageListenerContainer$SignalExecutingRunnable.class */
    private static final class SignalExecutingRunnable implements Runnable {
        private final CountDownLatch countDownLatch;
        private final Runnable runnable;

        private SignalExecutingRunnable(CountDownLatch countDownLatch, Runnable runnable) {
            this.countDownLatch = countDownLatch;
            this.runnable = runnable;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.runnable.run();
            } finally {
                this.countDownLatch.countDown();
            }
        }
    }

    public ResilientMessageListenerContainer(CircuitBreakerRegistry circuitBreakerRegistry, SqsProperties sqsProperties, List<MessageListenerContainerEventListener> list, TracingProperties tracingProperties) {
        this.circuitBreakerRegistry = circuitBreakerRegistry;
        this.sqsDefaultProperties = sqsProperties;
        this.eventListeners = list;
        this.tracingProperties = tracingProperties;
    }

    public static String createCircuitBreakerName(String str) {
        return CIRCUITBREAKER_NAME_PREFIX + (str.substring(0, 1).toUpperCase() + str.substring(1));
    }

    protected void initialize() {
        super.initialize();
        initializeRunningStateByQueue();
        this.scheduledFutureByQueue = new ConcurrentHashMap<>(getRegisteredQueues().size());
    }

    private void initializeRunningStateByQueue() {
        this.runningStateByQueue = new ConcurrentHashMap<>(getRegisteredQueues().size());
        Iterator it = getRegisteredQueues().keySet().iterator();
        while (it.hasNext()) {
            this.runningStateByQueue.put((String) it.next(), false);
        }
    }

    protected void doStart() {
        synchronized (getLifecycleMonitor()) {
            scheduleMessageListeners();
        }
    }

    protected void doStop() {
        notifyRunningQueuesToStop();
        waitForRunningQueuesToStop();
    }

    private void notifyRunningQueuesToStop() {
        for (Map.Entry<String, Boolean> entry : this.runningStateByQueue.entrySet()) {
            if (entry.getValue().booleanValue()) {
                stopQueue(entry.getKey());
            }
        }
    }

    private void waitForRunningQueuesToStop() {
        Iterator<Map.Entry<String, Boolean>> it = this.runningStateByQueue.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            Future<?> future = this.scheduledFutureByQueue.get(key);
            if (future != null) {
                try {
                    future.get(getQueueStopTimeout(), TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ExecutionException | TimeoutException e2) {
                    getLogger().warn("An exception occurred while stopping queue '" + key + "'", e2);
                }
            }
        }
    }

    private void scheduleMessageListeners() {
        for (Map.Entry entry : getRegisteredQueues().entrySet()) {
            startQueue((String) entry.getKey(), (AbstractMessageListenerContainer.QueueAttributes) entry.getValue());
        }
    }

    protected void executeMessage(org.springframework.messaging.Message<String> message) {
        getMessageHandler().handleMessage(message);
    }

    public void stop(String str) {
        Future<?> remove;
        stopQueue(str);
        try {
            if (isRunning(str) && (remove = this.scheduledFutureByQueue.remove(str)) != null) {
                remove.get(this.queueStopTimeout, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            getLogger().warn("Error stopping queue with name: '" + str + "'", e2);
        }
    }

    protected void stopQueue(String str) {
        Assert.isTrue(this.runningStateByQueue.containsKey(str), "Queue with name '" + str + "' does not exist");
        this.runningStateByQueue.put(str, false);
    }

    public void start(String str) {
        Assert.isTrue(this.runningStateByQueue.containsKey(str), "Queue with name '" + str + "' does not exist");
        startQueue(str, (AbstractMessageListenerContainer.QueueAttributes) getRegisteredQueues().get(str));
    }

    public boolean isRunning(String str) {
        Future<?> future = this.scheduledFutureByQueue.get(str);
        return (future == null || future.isCancelled() || future.isDone()) ? false : true;
    }

    protected void startQueue(String str, AbstractMessageListenerContainer.QueueAttributes queueAttributes) {
        if (this.runningStateByQueue.containsKey(str) && this.runningStateByQueue.get(str).booleanValue()) {
            return;
        }
        this.runningStateByQueue.put(str, true);
        String key = this.sqsDefaultProperties.findInternalNameByLogicalQueueName(str).orElseThrow(() -> {
            return new IllegalArgumentException("Queue with queueName=" + str + " used in @SqsListener annotation, but no QueueProperties defined!");
        }).getKey();
        this.externalToInternalQueueName.put(str, key);
        this.scheduledFutureByQueue.put(str, getTaskExecutor().submit(new AsynchronousMessageListener(str, queueAttributes, this.circuitBreakerRegistry.circuitBreaker(createCircuitBreakerName(key)))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyEventListeners(Consumer<? super MessageListenerContainerEventListener> consumer) {
        this.eventListeners.forEach(consumer);
    }
}
