package dev.soffa.foundation.pubsub.nats;

import dev.soffa.foundation.commons.Logger;
import dev.soffa.foundation.commons.TextUtil;
import dev.soffa.foundation.error.TechnicalException;
import dev.soffa.foundation.message.Message;
import dev.soffa.foundation.message.MessageHandler;
import dev.soffa.foundation.message.pubsub.PubSubClient;
import dev.soffa.foundation.message.pubsub.PubSubClientConfig;
import dev.soffa.foundation.pubsub.AbstractPubSubClient;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamOptions;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StreamConfiguration;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PreDestroy;

/* loaded from: input_file:dev/soffa/foundation/pubsub/nats/NatsClient.class */
public class NatsClient extends AbstractPubSubClient implements PubSubClient {
    private static final Logger LOG = Logger.get(NatsClient.class);
    private final PubSubClientConfig config;
    private Connection connection;
    private JetStream stream;

    public NatsClient(String str, PubSubClientConfig pubSubClientConfig, String str2) {
        super(str, pubSubClientConfig, str2);
        this.config = pubSubClientConfig;
        configure();
    }

    public void subscribe(String str, boolean z, MessageHandler messageHandler) {
        LOG.info("Configuring subscription to %s", new Object[]{str});
        NatsMessageHandler natsMessageHandler = new NatsMessageHandler(this.connection, messageHandler);
        Dispatcher createDispatcher = this.connection.createDispatcher();
        if (z) {
            configureStream(str);
            this.stream.subscribe(str, createDispatcher, natsMessageHandler, true, ((PushSubscribeOptions.Builder) PushSubscribeOptions.builder().configuration(ConsumerConfiguration.builder().durable(this.applicationName).ackWait(Duration.ofSeconds(5L)).ackPolicy(AckPolicy.Explicit).build())).build());
        } else {
            createDispatcher.subscribe(str, str + "-group", natsMessageHandler);
        }
    }

    private void configure() {
        try {
            this.connection = Nats.connect(new Options.Builder().servers(this.config.getAddresses().split(",")).maxReconnects(-1).build());
            this.stream = this.connection.jetStream(JetStreamOptions.defaultOptions());
            LOG.info("Connected to NATS servers: %s", new Object[]{this.config.getAddresses()});
        } catch (Exception e) {
            NatsUtil.close(this.connection);
            throw new TechnicalException(e, "Unable to connect to NATS @ %s", new Object[]{this.config.getAddresses()});
        }
    }

    private void configureStream(String str) {
        try {
            this.connection.jetStreamManagement().addStream(StreamConfiguration.builder().name(str).addSubjects(new String[]{str}).build());
        } catch (JetStreamApiException e) {
            LOG.warn("Stream %s already configured", new Object[]{str});
        }
    }

    @Override // dev.soffa.foundation.pubsub.AbstractPubSubClient
    protected CompletableFuture<byte[]> sendAndReceive(String str, Message message) {
        return this.connection.request(NatsUtil.createNatsMessage(str, message)).thenApply((v0) -> {
            return v0.getData();
        });
    }

    public void publish(String str, Message message) {
        this.connection.publish(NatsUtil.createNatsMessage(str, message));
    }

    public void broadcast(String str, Message message) {
        String resolveBroadcast = resolveBroadcast(str);
        if (TextUtil.isEmpty(resolveBroadcast)) {
            LOG.warn("Broadcasting ignored: %s, target is empy.", new Object[]{message.getOperation()});
            return;
        }
        PublishAck publish = this.stream.publish(NatsUtil.createNatsMessage(resolveBroadcast, message));
        if (publish.hasError()) {
            throw new TechnicalException(publish.getError(), new Object[0]);
        }
    }

    @PreDestroy
    protected void cleanup() {
        NatsUtil.close(this.connection);
    }
}
