package dev.soffa.foundation.pubsub.rabbitmq;

import dev.soffa.foundation.commons.Logger;
import dev.soffa.foundation.commons.Mappers;
import dev.soffa.foundation.message.Message;
import dev.soffa.foundation.message.MessageHandler;
import dev.soffa.foundation.message.pubsub.PubSubClient;
import dev.soffa.foundation.message.pubsub.PubSubClientConfig;
import dev.soffa.foundation.pubsub.AbstractPubSubClient;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PreDestroy;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

/* loaded from: input_file:dev/soffa/foundation/pubsub/rabbitmq/AmqpClient.class */
public class AmqpClient extends AbstractPubSubClient implements PubSubClient {
    private static final Logger LOG = Logger.get(AmqpClient.class);
    private final PubSubClientConfig config;
    private final RabbitAdmin rabbitAdmin;
    private final List<SimpleMessageListenerContainer> listeners;
    private final boolean embedded;

    public AmqpClient(String str, PubSubClientConfig pubSubClientConfig, String str2) {
        super(str, pubSubClientConfig, str2);
        this.listeners = new ArrayList();
        this.config = pubSubClientConfig;
        this.rabbitAdmin = configure();
        this.embedded = pubSubClientConfig.getAddresses().contains("://embedded");
    }

    @Override // dev.soffa.foundation.pubsub.AbstractPubSubClient
    protected CompletableFuture<byte[]> sendAndReceive(String str, Message message) {
        return CompletableFuture.supplyAsync(() -> {
            org.springframework.amqp.core.Message sendAndReceive = this.rabbitAdmin.getRabbitTemplate().sendAndReceive(new org.springframework.amqp.core.Message(Mappers.JSON.serializeAsBytes(message)));
            if (sendAndReceive == null || sendAndReceive.getBody() == null) {
                return null;
            }
            return sendAndReceive.getBody();
        });
    }

    public void subscribe(String str, boolean z, MessageHandler messageHandler) {
        if (z && this.embedded) {
            AmqpUtil.createFanoutExchange(this.rabbitAdmin, str, this.applicationName);
            return;
        }
        if (this.embedded) {
            AmqpUtil.declareQueue(this.rabbitAdmin, str);
        }
        SimpleMessageListenerContainer createListener = AmqpUtil.createListener(this.rabbitAdmin.getRabbitTemplate(), str, null, this.config.getOption("mode"));
        createListener.setMessageListener(message -> {
            messageHandler.handle((Message) Mappers.JSON.deserialize(message.getBody(), Message.class));
            LOG.info("Message processed: %s", new Object[]{Long.valueOf(message.getMessageProperties().getDeliveryTag())});
        });
        this.listeners.add(createListener);
        createListener.start();
    }

    public void subscribe(MessageHandler messageHandler) {
        subscribe(this.applicationName, false, messageHandler);
    }

    public void publish(String str, Message message) {
        String str2 = str;
        if (this.applicationName.equals(str2)) {
            str2 = str2 + AmqpUtil.TOPIC;
        }
        LOG.info("[amqp] Publishing message to %s", new Object[]{str2});
        this.rabbitAdmin.getRabbitTemplate().convertAndSend(str2, "", Mappers.JSON.serialize(message));
    }

    public void publish(Message message) {
        publish(this.applicationName, message);
    }

    public void broadcast(String str, Message message) {
        this.rabbitAdmin.getRabbitTemplate().convertAndSend(str, Mappers.JSON.serialize(message));
    }

    private RabbitAdmin configure() {
        return AmqpUtil.configure(this.applicationName, this.config);
    }

    @PreDestroy
    protected void destroy() {
        this.listeners.forEach((v0) -> {
            v0.destroy();
        });
    }
}
