package dev.soffa.foundation.pubsub.rabbitmq;

import com.google.common.base.Preconditions;
import dev.soffa.foundation.commons.Logger;
import dev.soffa.foundation.commons.Mappers;
import dev.soffa.foundation.commons.TextUtil;
import dev.soffa.foundation.error.TechnicalException;
import dev.soffa.foundation.message.MessageHandler;
import dev.soffa.foundation.message.MessageResponse;
import dev.soffa.foundation.message.pubsub.PubSubClient;
import dev.soffa.foundation.message.pubsub.PubSubClientConfig;
import dev.soffa.foundation.pubsub.AbstractPubSubClient;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PreDestroy;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

/* loaded from: input_file:dev/soffa/foundation/pubsub/rabbitmq/AmqpClient.class */
public class AmqpClient extends AbstractPubSubClient implements PubSubClient {
    private final PubSubClientConfig config;
    private final RabbitAdmin rabbitAdmin;
    private final List<AbstractMessageListenerContainer> listeners;
    private final boolean embedded;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dev/soffa/foundation/pubsub/rabbitmq/AmqpClient$InternalMessageHandler.class */
    public static class InternalMessageHandler {
        public static final Method HANDLER;
        private MessageHandler handler;

        public String handle(Message message) {
            boolean isNotEmpty = TextUtil.isNotEmpty(new String[]{message.getMessageProperties().getReplyTo()});
            try {
                Object orElse = this.handler.handle((dev.soffa.foundation.message.Message) Mappers.JSON_DEFAULT.deserialize(message.getBody(), dev.soffa.foundation.message.Message.class)).orElse(null);
                if (isNotEmpty) {
                    return Mappers.JSON_DEFAULT.serialize(MessageResponse.ok(orElse));
                }
                return null;
            } catch (Exception e) {
                if (isNotEmpty) {
                    return Mappers.JSON_DEFAULT.serialize(MessageResponse.error(e));
                }
                throw e;
            }
        }

        public InternalMessageHandler(MessageHandler messageHandler) {
            this.handler = messageHandler;
        }

        static {
            try {
                HANDLER = InternalMessageHandler.class.getMethod("handle", Message.class);
            } catch (NoSuchMethodException e) {
                throw new TechnicalException(e);
            }
        }
    }

    public AmqpClient(String str, PubSubClientConfig pubSubClientConfig, String str2) {
        super(str, pubSubClientConfig, str2);
        this.listeners = new ArrayList();
        this.config = pubSubClientConfig;
        this.rabbitAdmin = configure();
        this.embedded = pubSubClientConfig.getAddresses().contains("://embedded");
    }

    @Override // dev.soffa.foundation.pubsub.AbstractPubSubClient
    protected CompletableFuture<byte[]> sendAndReceive(String str, dev.soffa.foundation.message.Message message) {
        return CompletableFuture.supplyAsync(() -> {
            Message message2 = new Message(Mappers.JSON_DEFAULT.serializeAsBytes(message));
            message2.getMessageProperties().setAppId(this.applicationName);
            message2.getMessageProperties().setCorrelationId(TextUtil.snakeCase(message.getOperation()) + "_");
            Message sendAndReceive = this.rabbitAdmin.getRabbitTemplate().sendAndReceive(str, "", message2);
            if (sendAndReceive == null || sendAndReceive.getBody() == null) {
                return null;
            }
            return sendAndReceive.getBody();
        });
    }

    public void subscribe(String str, boolean z, MessageHandler messageHandler) {
        registerSubscription(str);
        if (z && this.embedded) {
            AmqpUtil.createFanoutExchange(this.rabbitAdmin, str, this.applicationName);
            return;
        }
        if (this.embedded) {
            AmqpUtil.declarExchange(this.rabbitAdmin, str, str);
        }
        AbstractMessageListenerContainer createListener = AmqpUtil.createListener(this.rabbitAdmin.getRabbitTemplate(), str, null, this.config.getOption("mode"));
        InternalMessageHandler internalMessageHandler = new InternalMessageHandler(messageHandler);
        MessagingMessageListenerAdapter messagingMessageListenerAdapter = new MessagingMessageListenerAdapter(internalMessageHandler, InternalMessageHandler.HANDLER);
        messagingMessageListenerAdapter.setHandlerAdapter(new HandlerAdapter(new InvocableHandlerMethod(internalMessageHandler, InternalMessageHandler.HANDLER)));
        Preconditions.checkNotNull(messagingMessageListenerAdapter);
        createListener.setMessageListener(messagingMessageListenerAdapter);
        this.listeners.add(createListener);
        createListener.start();
    }

    public void subscribe(MessageHandler messageHandler) {
        if (hasSubscription(this.applicationName)) {
            Logger.platform.warn("A subscription already exists for: %s", new Object[]{this.applicationName});
        } else {
            subscribe(this.applicationName, false, messageHandler);
        }
    }

    public void publish(String str, dev.soffa.foundation.message.Message message) {
        this.rabbitAdmin.getRabbitTemplate().convertAndSend(str, "", Mappers.JSON_DEFAULT.serialize(message));
    }

    public void publish(dev.soffa.foundation.message.Message message) {
        publish(this.applicationName, message);
    }

    public void broadcast(String str, dev.soffa.foundation.message.Message message) {
        this.rabbitAdmin.getRabbitTemplate().convertAndSend(str, Mappers.JSON_DEFAULT.serialize(message));
    }

    private RabbitAdmin configure() {
        return AmqpUtil.configure(this.applicationName, this.config);
    }

    @PreDestroy
    protected void destroy() {
        this.listeners.forEach((v0) -> {
            v0.destroy();
        });
    }
}
