/*
 * Decompiled with CFR 0.152.
 */
package io.paradoxical.rabbitmq.queues;

import com.godaddy.logging.Logger;
import com.godaddy.logging.LoggerFactory;
import com.rabbitmq.client.QueueingConsumer;
import io.paradoxical.rabbitmq.ListenerOptions;
import io.paradoxical.rabbitmq.Message;
import io.paradoxical.rabbitmq.RabbitRawMessage;
import io.paradoxical.rabbitmq.SingleQueueConfiguration;
import io.paradoxical.rabbitmq.connectionManagment.ChannelProvider;
import io.paradoxical.rabbitmq.queues.BlockingQueueConsumerSyncBase;
import io.paradoxical.rabbitmq.queues.EmptyMessagePromise;
import io.paradoxical.rabbitmq.queues.EventBase;
import io.paradoxical.rabbitmq.queues.MessagePromise;
import io.paradoxical.rabbitmq.results.MessageResult;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

public class PromiseQueueConsumer<T extends EventBase>
extends BlockingQueueConsumerSyncBase<T> {
    private static final Logger logger = LoggerFactory.getLogger(PromiseQueueConsumer.class);

    public PromiseQueueConsumer(ChannelProvider channelProvider, SingleQueueConfiguration info, Class<T> target) throws IOException, InterruptedException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
        super(channelProvider, info, target);
    }

    public PromiseQueueConsumer(ChannelProvider channelProvider, SingleQueueConfiguration info, Class<T> target, ListenerOptions options) throws IOException, InterruptedException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
        super(channelProvider, info, target, options);
    }

    public synchronized MessagePromise<T> getNextMessage(Duration timeout) {
        try {
            if (this.consumer != null) {
                QueueingConsumer.Delivery delivery = this.consumer.getConsumer().nextDelivery(timeout.toMillis());
                if (delivery == null) {
                    return EmptyMessagePromise.instance();
                }
                RabbitRawMessage rabbitRawMessage = new RabbitRawMessage(this.consumer.getConsumer().getConsumerTag(), delivery.getEnvelope(), delivery.getProperties(), delivery.getBody(), this.consumer.getExchange());
                final CompletableFuture completableFuture = new CompletableFuture();
                final CompletableFuture messageFuture = new CompletableFuture();
                MessagePromise messagePromise = new MessagePromise<T>(){

                    @Override
                    public void complete(MessageResult result) {
                        completableFuture.complete(result);
                    }

                    @Override
                    public Optional<Message<T>> getMessage() throws ExecutionException, InterruptedException {
                        return Optional.of(messageFuture.get());
                    }
                };
                Executors.newCachedThreadPool().submit(() -> {
                    try {
                        this.deliveryHandler(this.consumer.getConsumer().getChannel(), this.consumer.getQueue(), rabbitRawMessage, msg -> {
                            messageFuture.complete(msg);
                            return completableFuture;
                        });
                    }
                    catch (Throwable e) {
                        logger.error(e, "Error processing message asynchronously", new Object[0]);
                        messageFuture.completeExceptionally(e);
                    }
                });
                return messagePromise;
            }
        }
        catch (Throwable ex) {
            logger.error(ex, "Error getting message!", new Object[0]);
        }
        return EmptyMessagePromise.instance();
    }
}

