/*
 * Decompiled with CFR 0.152.
 */
package io.grisu.usvcs.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.grisu.core.exceptions.GrisuException;
import io.grisu.core.utils.MapBuilder;
import io.grisu.pojo.utils.JSONUtils;
import io.grisu.usvcs.annotations.MicroService;
import io.grisu.usvcs.annotations.NanoService;
import io.grisu.usvcs.rabbitmq.RPCUtils;
import io.grisu.usvcs.rabbitmq.RabbitMQConstants;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

public class ServerRabbitMQ {
    private static final long SLEEP_MILLISECS = 1000L;
    private final Channel channel;
    private final String rpcQueueName;
    private final int concurrency;
    private final Map<String, Method> nServicesHandlers;
    private volatile AtomicBoolean running;
    private String consumerTag;
    final Consumer consumer;

    public ServerRabbitMQ(final Channel channel, int concurrency, final Object uServiceImpl) {
        this.channel = channel;
        this.concurrency = concurrency;
        Class uServiceHandler = Stream.of(uServiceImpl.getClass().getInterfaces()).filter(i -> i.getAnnotation(MicroService.class) != null).findFirst().orElseThrow(() -> new RuntimeException("Service not annotated with @MicroService (" + uServiceImpl.getClass() + ")"));
        this.rpcQueueName = uServiceHandler.getAnnotation(MicroService.class).serviceQueue();
        this.nServicesHandlers = new HashMap<String, Method>();
        Stream.of(uServiceHandler.getMethods()).forEach(m -> {
            NanoService nanoServiceAnnotation = m.getAnnotation(NanoService.class);
            if (nanoServiceAnnotation != null) {
                this.nServicesHandlers.put(nanoServiceAnnotation.name(), (Method)m);
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.stop();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }));
        this.consumer = new DefaultConsumer(channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                Object[] message = RPCUtils.decodeMessage(body);
                String nService = (String)message[0];
                if (nService != null) {
                    Map result;
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
                    String opResult = "OK";
                    try {
                        Method method = (Method)ServerRabbitMQ.this.nServicesHandlers.get(nService);
                        if (method == null) {
                            throw new RuntimeException(ServerRabbitMQ.this.rpcQueueName + "#" + nService + " nanoService not found!");
                        }
                        Object[] params = JSONUtils.decodeAsParams((byte[])((byte[])message[1]), (Type[])method.getGenericParameterTypes());
                        result = ((CompletableFuture)method.invoke(uServiceImpl, params)).join();
                    }
                    catch (Exception e) {
                        opResult = "KO";
                        Throwable th = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
                        result = th instanceof GrisuException ? ((GrisuException)th).serialize() : MapBuilder.instance().add((Object)"error_message", (Object)th.toString()).add((Object)"error_code", (Object)RabbitMQConstants.ERROR_CODE).build();
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    channel.basicPublish("", properties.getReplyTo(), replyProps, RPCUtils.encodeMessage(opResult, JSONUtils.encode((Object)result)));
                }
            }
        };
    }

    public void start() throws IOException, InterruptedException {
        this.running = new AtomicBoolean(true);
        this.channel.queueDeclare(this.rpcQueueName, false, false, false, null);
        if (this.concurrency > 0) {
            this.channel.basicQos(this.concurrency);
        }
        this.consumerTag = this.channel.basicConsume(this.rpcQueueName, false, this.consumer);
    }

    public void stop() throws IOException, TimeoutException, InterruptedException {
        this.running.set(false);
        this.channel.basicCancel(this.consumerTag);
        Thread.sleep(1000L);
        Connection connection = this.channel.getConnection();
        this.channel.close();
        connection.close();
    }
}

