/*
 * Decompiled with CFR 0.152.
 */
package io.paradoxical.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import io.paradoxical.rabbitmq.ChannelInitializer;
import io.paradoxical.rabbitmq.Endpoint;
import io.paradoxical.rabbitmq.Exchange;
import io.paradoxical.rabbitmq.FilterAttributes;
import io.paradoxical.rabbitmq.FutureEventProcessor;
import io.paradoxical.rabbitmq.ListenerOptions;
import io.paradoxical.rabbitmq.Message;
import io.paradoxical.rabbitmq.PublisherOptions;
import io.paradoxical.rabbitmq.PublisherProviderImpl;
import io.paradoxical.rabbitmq.PublishingContext;
import io.paradoxical.rabbitmq.Queue;
import io.paradoxical.rabbitmq.RabbitRawMessage;
import io.paradoxical.rabbitmq.RetryExchange;
import io.paradoxical.rabbitmq.connectionManagment.ChannelProvider;
import io.paradoxical.rabbitmq.metrics.MetricsEventManager;
import io.paradoxical.rabbitmq.metrics.MetricsEventManagerOptions;
import io.paradoxical.rabbitmq.metrics.delegators.DelegatedCounter;
import io.paradoxical.rabbitmq.metrics.delegators.DelegatedTimer;
import io.paradoxical.rabbitmq.queues.EventBase;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.logging.MDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ListenerBase<T extends EventBase> {
    private static final Logger logger = LoggerFactory.getLogger(ListenerBase.class);
    private final Class<T> target;
    private final ChannelProvider channelProvider;
    private final ListenerOptions options;
    private final List<String> consumingQueueTags = new ArrayList<String>();
    private final AtomicInteger messagesInProgress;
    private final Semaphore drainingSemaphore = new Semaphore(1);
    private final MetricsEventManager<T> metricsManager;
    private volatile boolean running = false;

    public ListenerBase(Class<T> targetType, ChannelProvider channelProvider, ListenerOptions options) {
        this.target = targetType;
        this.channelProvider = channelProvider;
        this.options = options;
        this.messagesInProgress = new AtomicInteger(0);
        this.metricsManager = new MetricsEventManager<EventBase>(MetricsEventManagerOptions.builder().classMapper(this::getEventClassType).metricGroups(options.getMetricGroups()).sourceClass(this.getClass()).metricRegistry(options.getMetricRegistry()).build());
    }

    public void start() {
        try {
            this.running = true;
            this.start(this.getEndpoint().getChannel());
        }
        catch (IOException | InterruptedException e) {
            logger.error("Error running listener", (Throwable)e);
        }
    }

    protected synchronized void start(Channel channel) throws InterruptedException, IOException {
        Endpoint endpoint = this.getEndpoint();
        for (Queue originalQueue : endpoint.getQueues()) {
            Queue initializedQueue = ChannelInitializer.initializeExchange(channel, endpoint.getExchange(), originalQueue);
            String tag = channel.basicConsume(initializedQueue.getName(), false, this.getRmqConsumerType(channel, endpoint.getExchange(), initializedQueue));
            logger.info("Running listener with tag=" + tag);
            this.consumingQueueTags.add(tag);
        }
    }

    protected abstract Consumer getRmqConsumerType(Channel var1, Exchange var2, Queue var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void deliveryHandler(Channel channel, Queue queue, RabbitRawMessage message, FutureEventProcessor<T> callback) throws IOException {
        DelegatedTimer.Context firehoseTimer = null;
        try {
            if (!this.running) {
                logger.warn("Message received on dead channel, attempting nack but not expecting success");
                this.nack(message.getEnvelope(), channel, true, null);
                return;
            }
        }
        catch (Throwable nackException) {
            if (!this.running) {
                logger.warn("Stopped listener cannot respond on channel", nackException);
            }
            logger.error("Unhandled exception during dequeuing. Attempting nack!", nackException);
        }
        try {
            this.messagesInProgress.incrementAndGet();
            logger.debug("Got message on listener tag=" + message.getConsumerTag() + " message-delivery-tag=" + message.getEnvelope().getDeliveryTag());
            firehoseTimer = this.getFirehoseTimer().time();
            this.getInFlightFirehoseCounter().inc();
            this.process(message, channel, queue, callback);
        }
        catch (Throwable processingException) {
            if (!this.running) {
                logger.warn("Stopped listener cannot respond on channel", processingException);
            } else {
                logger.error("Unhandled exception during dequeuing. Attempting nack!", processingException);
            }
            try {
                this.nack(message.getEnvelope(), channel, !message.getEnvelope().isRedeliver(), null);
                logger.warn("Unhandled exception was able to successfully nack message but message may not have be properly processed.");
            }
            catch (Throwable nackException) {
                if (!this.running) {
                    logger.warn("Channel closed, unable to re-nack");
                    return;
                }
                logger.error("Nack unable to proceed! Message may be stuck in dead state and the consumer connection should be restarted. tag=" + message.getConsumerTag(), nackException);
            }
        }
        finally {
            this.messagesInProgress.decrementAndGet();
            this.drainingSemaphore.release();
            if (this.running) {
                if (firehoseTimer != null) {
                    firehoseTimer.stop();
                    this.getInFlightFirehoseCounter().dec();
                }
                logger.debug("Completing message - message-delivery-tag=" + message.getEnvelope().getDeliveryTag());
            }
        }
    }

    protected Class<?> getEventClassType(T item) {
        return item.getClass();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void process(RabbitRawMessage rabbitRawMessage, Channel channel, Queue queue, FutureEventProcessor<T> callback) throws IOException {
        Message<T> message = null;
        try {
            message = this.getMessageFromRaw(rabbitRawMessage, queue);
            EventBase item = (EventBase)message.getItem();
            DelegatedTimer.Context eventTypeTimer = this.getEventTypeTimer(item).time();
            this.getEventTypeInFlightCounter(item).inc();
            try {
                this.delegateMessage(message, channel, callback);
            }
            finally {
                eventTypeTimer.stop();
                this.getEventTypeInFlightCounter(item).dec();
            }
        }
        catch (Throwable ex) {
            logger.error(String.format("Error during dequeueing. Tag %s", rabbitRawMessage.getEnvelope().getDeliveryTag()), ex);
            if (message != null) {
                this.requeue(message, channel);
            } else {
                this.nack(rabbitRawMessage.getEnvelope(), channel, !rabbitRawMessage.getEnvelope().isRedeliver(), null);
            }
        }
        finally {
            this.cleanMessageContext();
        }
    }

    protected Message<T> getMessageFromRaw(RabbitRawMessage rabbitRawMessage, Queue queue) throws IOException {
        Message<T> message = new Message<T>();
        message.setRabbitRawMessage(rabbitRawMessage);
        message.setAtLeastOnceDelivery(rabbitRawMessage.getEnvelope().isRedeliver());
        UUID correlationId = ChannelInitializer.setupMessageContext(rabbitRawMessage, this.getEndpoint().getExchange(), queue);
        message.setItem(this.getDeserializedItem(rabbitRawMessage, correlationId));
        return message;
    }

    private DelegatedTimer getFirehoseTimer() {
        return this.metricsManager.getTimer("events");
    }

    private DelegatedTimer getEventTypeTimer(T item) {
        return this.metricsManager.getTimerByEventType(item, "events");
    }

    private DelegatedCounter getEventTypeRetryCounter(T item) {
        return this.metricsManager.getCounterByEventType(item, "retries");
    }

    private DelegatedCounter getRetryFirehoseCounter() {
        return this.metricsManager.getCounter("retries");
    }

    private DelegatedCounter getEventTypeInFlightCounter(T item) {
        if (item == null) {
            return new DelegatedCounter(Collections.emptyList());
        }
        return this.metricsManager.getCounterByEventType(item, "inflight");
    }

    private DelegatedCounter getInFlightFirehoseCounter() {
        return this.metricsManager.getCounter("inflight");
    }

    private DelegatedCounter getEventTypeRejectCounter(T item) {
        return this.metricsManager.getCounterByEventType(item, "reject");
    }

    private DelegatedCounter getRejectFirehoseCounter() {
        return this.metricsManager.getCounter("reject");
    }

    private void cleanMessageContext() {
        MDC.remove((String)FilterAttributes.CORR_ID);
    }

    private T getDeserializedItem(RabbitRawMessage rabbitRawMessage, UUID correlationId) throws IOException {
        T item = this.deserialize(rabbitRawMessage.getBody());
        if (((EventBase)item).getCorrelationId() == null) {
            ((EventBase)item).setCorrelationId(correlationId);
        }
        return item;
    }

    private void delegateMessage(Message<T> message, Channel channel, FutureEventProcessor<T> callback) throws IOException, ExecutionException, InterruptedException {
        switch (callback.handleMessage(message).get()) {
            case Ack: {
                this.ack(message, channel);
                break;
            }
            case Nack: {
                this.nack(message, channel);
                break;
            }
            case RequeueUntilMaxTries: {
                this.requeue(message, channel);
                break;
            }
            case RetryLater: {
                this.retryLater(message, channel);
                break;
            }
            case Defer: {
                this.defer(message, channel);
            }
        }
    }

    private void defer(Message<T> message, Channel channel) throws IOException {
        this.nack(message.getRabbitRawMessage().getEnvelope(), channel, true, (EventBase)message.getItem());
    }

    private void requeue(Message<T> message, Channel channel) throws IOException {
        if (this.options.getMaxRetries() == 1) {
            this.nack(message.getRabbitRawMessage().getEnvelope(), channel, !message.getRabbitRawMessage().getEnvelope().isRedeliver(), (EventBase)message.getItem());
            return;
        }
        int publishAttempts = this.getPublishAttempts(message);
        if (publishAttempts < this.options.getMaxRetries()) {
            PublishingContext publishingContext = new PublishingContext();
            publishingContext.setPreviousPublishes(publishAttempts);
            PublisherOptions publisherOptions = PublisherOptions.builder().context(publishingContext).build();
            this.publishRetryMessage(message, publisherOptions, message.getRabbitRawMessage().getExchange(), channel);
        }
    }

    private void ack(Message<T> message, Channel channel) throws IOException {
        channel.basicAck(message.getRabbitRawMessage().getEnvelope().getDeliveryTag(), false);
    }

    private void nack(Message<T> message, Channel channel) throws IOException {
        this.nack(message.getRabbitRawMessage().getEnvelope(), channel, false, (EventBase)message.getItem());
    }

    private void retryLater(Message<T> message, Channel channel) throws IOException {
        Optional<RetryExchange> retryExchangeOption = message.getRabbitRawMessage().getExchange().getRetryExchange();
        if (!retryExchangeOption.isPresent()) {
            throw new RuntimeException("Unable to retry message, no retry exchange is declared!");
        }
        RetryExchange retryExchange = retryExchangeOption.get();
        int publishAttempts = this.getPublishAttempts(message);
        Optional<Duration> duration = this.getRetryDuration(message, publishAttempts);
        if (!duration.isPresent()) {
            logger.warn("Retry duration is none but asked to retry. Must nack");
            this.nack(message, channel);
            return;
        }
        if (publishAttempts >= this.options.getMaxRetries()) {
            logger.warn("Max publish attempts reached. publishAttempts={}, maxRetries={}", (Object)publishAttempts, (Object)this.options.getMaxRetries());
            this.nack(message, channel);
            return;
        }
        PublishingContext publishingContext = new PublishingContext();
        publishingContext.setPreviousPublishes(publishAttempts);
        PublisherOptions publisherOptions = PublisherOptions.builder().messageTtl(duration.get()).context(publishingContext).build();
        this.publishRetryMessage(message, publisherOptions, retryExchange, channel);
    }

    private void publishRetryMessage(Message<T> message, PublisherOptions publisherOptions, Exchange exchange, Channel channel) throws IOException {
        PublisherProviderImpl publisher = new PublisherProviderImpl(this.channelProvider, ((EventBase)message.getItem())::getCorrelationId, this.options.getSerializer());
        publisher.forExchange(exchange).onRoute(message.getRabbitRawMessage().getEnvelope().getRoutingKey()).publish(message.getItem(), publisherOptions);
        this.ack(message, channel);
        this.updateRedeliveryCounter((EventBase)message.getItem());
    }

    private int getPublishAttempts(Message<T> message) {
        Object publishAttemptsRaw = message.getRabbitRawMessage().getProperties().getHeaders().get("X-PUBLISH-ATTEMPTS");
        return publishAttemptsRaw == null ? 0 : Integer.parseInt(publishAttemptsRaw.toString());
    }

    private Optional<Duration> getRetryDuration(Message<T> message, int publishAttempts) {
        return message.getRabbitRawMessage().getExchange().getRetryExchange().flatMap(i -> i.getStrategy().nextRetry(publishAttempts, message.getItem()));
    }

    public void stop() throws IOException, TimeoutException {
        if (!this.running) {
            return;
        }
        this.running = false;
        try {
            while (this.messagesInProgress.get() > 0) {
                this.drainingSemaphore.acquire();
            }
        }
        catch (InterruptedException e) {
            logger.error("Error", (Throwable)e);
        }
        if (this.getEndpoint() != null && this.getEndpoint().getChannel() != null) {
            Channel channel = this.getEndpoint().getChannel();
            this.consumingQueueTags.forEach(tag -> {
                try {
                    logger.info("Stopping RMQ tag=" + tag);
                    channel.basicCancel(tag);
                }
                catch (IOException e) {
                    logger.error("Error disconnecting from channel with tag=" + tag, (Throwable)e);
                }
            });
            this.consumingQueueTags.clear();
            channel.close();
        }
    }

    protected abstract Endpoint getEndpoint();

    protected T deserialize(byte[] body) throws IOException {
        try {
            return (T)((EventBase)this.options.getSerializer().read(body, this.target));
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void nack(Envelope envelope, Channel channel, Boolean redeliver, T item) throws IOException {
        logger.info(String.format("Nacking message tag=%s, attempting redelivery: %b", envelope.getDeliveryTag(), redeliver));
        if (redeliver.booleanValue()) {
            this.updateRedeliveryCounter(item);
        } else {
            this.updateRejectCounter(item);
        }
        channel.basicNack(envelope.getDeliveryTag(), false, redeliver.booleanValue());
    }

    private void updateRejectCounter(T item) {
        this.getRejectFirehoseCounter().inc();
        if (item != null) {
            this.getEventTypeRejectCounter(item).inc();
        }
    }

    private void updateRedeliveryCounter(T item) {
        this.getRetryFirehoseCounter().inc();
        if (item != null) {
            this.getEventTypeRetryCounter(item).inc();
        }
    }
}

