package net.subnoize.qcat.sqs;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Parameter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import net.subnoize.qcat.Session;
import net.subnoize.qcat.model.Attribute;
import net.subnoize.qcat.sqs.SqsMessageAttributes;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/* loaded from: input_file:net/subnoize/qcat/sqs/Qcat4SqsWorker.class */
class Qcat4SqsWorker implements Runnable, RejectedExecutionHandler {
    private static final Logger log = LoggerFactory.getLogger(Qcat4SqsWorker.class);

    @Autowired
    private SqsAsyncClient asyncClient;

    @Autowired
    private ObjectMapper mapper;
    private List<CompletableFuture<Integer>> threadHandles = new CopyOnWriteArrayList();
    private boolean running = false;
    private DescriptiveStatistics stats = new DescriptiveStatistics();
    private AtomicInteger taskCounter = new AtomicInteger(0);
    private int threadCeiling = 0;
    private ExecutorService executorService;
    private ScheduledExecutorService scheduleService;
    private SqsExecutionTemplate template;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Qcat4SqsWorker(SqsExecutionTemplate sqsExecutionTemplate) {
        this.template = sqsExecutionTemplate;
        this.stats.setWindowSize(100);
    }

    @PostConstruct
    public void init() {
        this.running = true;
        this.executorService = new ThreadPoolExecutor(this.template.getTo().min(), this.template.getTo().max(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        this.scheduleService = Executors.newSingleThreadScheduledExecutor();
        this.scheduleService.scheduleAtFixedRate(this, 0L, this.template.getTo().polling(), TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        this.running = false;
        this.scheduleService.shutdown();
        this.executorService.shutdown();
        log.info("Stopping: {}.{}('{}')", new Object[]{this.template.getTarget().getClass().getName(), this.template.getMethod().getName(), this.template.getQueueUrl()});
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.running) {
            try {
                if (this.taskCounter.get() < this.template.getTo().min() || this.taskCounter.get() < this.threadCeiling) {
                    pollServer();
                }
                manageThreadHandles();
            } catch (Exception e) {
                log.error("Message handling errors", e);
            }
        }
    }

    private void pollServer() {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        this.executorService.submit(() -> {
            try {
                try {
                    this.taskCounter.addAndGet(1);
                    List<Message> messages = ((ReceiveMessageResponse) this.asyncClient.receiveMessage(builder -> {
                        builder.queueUrl(this.template.getQueueUrl()).maxNumberOfMessages(10).messageAttributeNames(this.template.getAttributeNames());
                    }).get()).messages();
                    if (!messages.isEmpty()) {
                        processMessages(messages);
                    }
                    completableFuture.complete(Integer.valueOf(messages.size()));
                    this.taskCounter.addAndGet(-1);
                } catch (Exception e) {
                    if (this.running) {
                        log.error("Error from server polling thread", e);
                    }
                    completableFuture.completeExceptionally(e);
                    this.taskCounter.addAndGet(-1);
                }
            } catch (Throwable th) {
                this.taskCounter.addAndGet(-1);
                throw th;
            }
        });
        this.threadHandles.add(completableFuture);
    }

    private void manageThreadHandles() {
        int size = this.threadHandles.size();
        this.threadHandles.forEach(completableFuture -> {
            try {
                if (completableFuture.isDone()) {
                    if (!completableFuture.isCompletedExceptionally()) {
                        this.stats.addValue(((Integer) completableFuture.get()).intValue() * 0.1d);
                    }
                }
            } catch (Exception e) {
                log.error("Error getting results from server polling thread", e);
            } finally {
                this.threadHandles.remove(completableFuture);
            }
        });
        if (size < this.threadHandles.size()) {
            calculateThreadCeiling();
        }
    }

    private void calculateThreadCeiling() {
        if (this.stats.getMean() == Double.NaN) {
            this.threadCeiling = this.template.getTo().min();
        }
        double max = this.template.getTo().max() * this.stats.getMean();
        if (max < this.template.getTo().min()) {
            max = this.template.getTo().min();
        }
        int round = (int) Math.round(max);
        if (round != this.threadCeiling) {
            log.info("Thread Ceiling: {}", Integer.valueOf(round));
            this.threadCeiling = round;
        }
    }

    private void processMessages(List<Message> list) {
        list.forEach(message -> {
            try {
                Session<Message> newSession = this.template.newSession(message);
                handleMessage(newSession);
                if (newSession.isAcknowledge()) {
                    DeleteMessageResponse deleteMessageResponse = (DeleteMessageResponse) this.asyncClient.deleteMessage(builder -> {
                        builder.queueUrl(this.template.getQueueUrl()).receiptHandle(message.receiptHandle());
                    }).get();
                    if (200 != deleteMessageResponse.sdkHttpResponse().statusCode()) {
                        log.info("Delete: {} {}", deleteMessageResponse.sdkHttpResponse().statusText(), Integer.valueOf(deleteMessageResponse.sdkHttpResponse().statusCode()));
                    }
                }
                MDC.clear();
            } catch (Exception e) {
                log.error("Error while handling messages", e);
            }
        });
    }

    private void handleMessage(Session<Message> session) throws JsonProcessingException, IllegalAccessException, InvocationTargetException, InterruptedException, ExecutionException {
        Object[] parseParams = parseParams(session);
        if (parseParams.length == this.template.getParameters().length) {
            Object invoke = this.template.invoke(parseParams);
            if (session.isError()) {
                log.error("Error: {} ({})", session.getErrorDescription(), Integer.valueOf(session.getErrorCode()));
            }
            if (invoke != null && this.template.isSendToPresent()) {
                if (invoke instanceof SendMessageRequest) {
                    this.asyncClient.sendMessage((SendMessageRequest) invoke).get();
                } else {
                    this.asyncClient.sendMessage((SendMessageRequest) SendMessageRequest.builder().queueUrl(session.getDestination()).messageBody(this.template.isSendToAsString() ? invoke.toString() : this.mapper.writeValueAsString(invoke)).messageAttributes(getAttributes(session)).build()).get();
                }
            }
        }
    }

    private Map<String, MessageAttributeValue> getAttributes(Session<Message> session) {
        if (session.getAttributes().isEmpty()) {
            return null;
        }
        SqsMessageAttributes.Builder builder = SqsMessageAttributes.builder();
        session.getAttributes().forEach((str, obj) -> {
            builder.attr(str, obj);
        });
        return builder.build();
    }

    private Object[] parseParams(Session<Message> session) throws JsonProcessingException {
        MessageAttributeValue messageAttributeValue;
        Object[] objArr = new Object[this.template.getParameters().length];
        for (int i = 0; i < objArr.length; i++) {
            Parameter parameter = this.template.getParameters()[i];
            if (parameter.getType() == Session.class) {
                objArr[i] = session;
            } else if (parameter.equals(this.template.getPayload())) {
                if (parameter.getType() == Message.class) {
                    objArr[i] = parameter.getType().cast(Message.class);
                } else if (parameter.getType() == String.class) {
                    objArr[i] = ((Message) session.getRequest()).body();
                } else {
                    try {
                        objArr[i] = this.mapper.readValue(((Message) session.getRequest()).body(), this.template.getPayload().getType());
                    } catch (JsonProcessingException e) {
                        if (session == null) {
                            throw e;
                        }
                        objArr[i] = null;
                        session.setError(true);
                        session.setErrorCode(500);
                        session.setErrorDescription(e.getMessage());
                        log.error("Error parsing object: {} Exception: {}", ((Message) session.getRequest()).body(), e.getMessage());
                    }
                }
            } else if (parameter.isAnnotationPresent(Attribute.class)) {
                MessageAttributeValue messageAttributeValue2 = (MessageAttributeValue) ((Message) session.getRequest()).messageAttributes().get(parameter.getAnnotation(Attribute.class).value());
                if (messageAttributeValue2 != null) {
                    objArr[i] = messageAttributeValue2.stringValue();
                } else {
                    objArr[i] = null;
                }
            } else {
                objArr[i] = null;
            }
        }
        if (StringUtils.isNotBlank(this.template.getTo().transactionId()) && (messageAttributeValue = (MessageAttributeValue) ((Message) session.getRequest()).messageAttributes().get(this.template.getTo().transactionId())) != null) {
            MDC.put(this.template.getTo().transactionId(), messageAttributeValue.stringValue());
            String dataType = messageAttributeValue.dataType();
            boolean z = -1;
            switch (dataType.hashCode()) {
                case -1950496919:
                    if (dataType.equals("Number")) {
                        z = true;
                        break;
                    }
                    break;
                case -1808118735:
                    if (dataType.equals("String")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    session.getAttributes().put(this.template.getTo().transactionId(), messageAttributeValue.stringValue());
                    break;
                case true:
                    if (NumberUtils.isParsable(messageAttributeValue.stringValue())) {
                        if (messageAttributeValue.stringValue().contains(".")) {
                            session.getAttributes().put(this.template.getTo().transactionId(), Double.valueOf(Double.parseDouble(messageAttributeValue.stringValue())));
                            break;
                        } else {
                            session.getAttributes().put(this.template.getTo().transactionId(), Long.valueOf(Long.parseLong(messageAttributeValue.stringValue())));
                            break;
                        }
                    } else {
                        session.getAttributes().put(this.template.getTo().transactionId(), messageAttributeValue.stringValue());
                        break;
                    }
                default:
                    session.getAttributes().put(this.template.getTo().transactionId(), messageAttributeValue.binaryValue().asByteArray());
                    break;
            }
        }
        return objArr;
    }

    @Override // java.util.concurrent.RejectedExecutionHandler
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
        log.error("{} : has been rejected", runnable.toString());
    }
}
