package info.xiancloud.plugin.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import info.xiancloud.plugin.conf.EnvConfig;
import info.xiancloud.plugin.mq.IMqConsumerClient;
import info.xiancloud.plugin.mq.IMqPubClient;
import info.xiancloud.plugin.util.EnvUtil;
import info.xiancloud.plugin.util.LOG;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/* loaded from: input_file:info/xiancloud/plugin/rabbitmq/RabbitMqClient.class */
public class RabbitMqClient implements IMqPubClient, IMqConsumerClient {
    private Connection conn;
    private Channel defaultGlobalPublisherChannel;
    private LoadingCache<String, Channel> consumerChannels = CacheBuilder.newBuilder().build(new CacheLoader<String, Channel>() { // from class: info.xiancloud.plugin.rabbitmq.RabbitMqClient.1
        public Channel load(String str) throws Exception {
            Channel createChannel = RabbitMqClient.this.conn.createChannel();
            createChannel.basicQos(10);
            return createChannel;
        }
    });
    private final String EXCHANGE_NAME = "xian-global-exchange";
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private Cache<String, Boolean> queueBound = CacheBuilder.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS).build();

    public boolean p2pPublish(String str, String str2) {
        return basicPublish(str, str2, false);
    }

    public boolean staticPublish(String str, String str2) {
        return basicPublish(str, str2, true);
    }

    private boolean basicPublish(String str, String str2, boolean z) {
        try {
            initIfNotInitialized();
            synchronized (this.defaultGlobalPublisherChannel) {
                try {
                    createQueueIfNotCreated(str, z);
                    this.defaultGlobalPublisherChannel.basicPublish("xian-global-exchange", str, (AMQP.BasicProperties) null, str2.getBytes());
                } catch (Throwable th) {
                    LOG.error("发布消息至队列失败：" + str, th);
                    return false;
                }
            }
            return true;
        } catch (Exception e) {
            LOG.error(e);
            return false;
        }
    }

    private synchronized void createQueueIfNotCreated(String str, boolean z) throws IOException {
        Boolean bool = (Boolean) this.queueBound.getIfPresent(str);
        if (bool == null || !bool.booleanValue()) {
            this.defaultGlobalPublisherChannel.queueDeclare(str, z, false, !z, (Map) null);
            this.defaultGlobalPublisherChannel.queueBind(str, "xian-global-exchange", str);
            this.queueBound.put(str, true);
        }
    }

    public void destroy() {
        synchronized (this.initialized) {
            if (this.initialized.get()) {
                do_destroy();
                this.initialized.set(false);
            }
        }
    }

    private void do_destroy() {
        try {
            if (this.defaultGlobalPublisherChannel == null || !this.defaultGlobalPublisherChannel.isOpen()) {
                LOG.error("channel已关闭，不应当重复关闭");
            } else {
                this.defaultGlobalPublisherChannel.close();
            }
            if (this.conn == null || !this.conn.isOpen()) {
                LOG.error("connection已关闭，不应当重复关闭");
            } else {
                this.conn.close();
            }
            this.queueBound.invalidateAll();
        } catch (Throwable th) {
            LOG.error(th);
        }
    }

    public void initIfNotInitialized() throws Exception {
        if (this.initialized.get()) {
            return;
        }
        synchronized (this.initialized) {
            if (!this.initialized.get()) {
                do_init();
                this.initialized.set(true);
            }
        }
    }

    private void do_init() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername(EnvConfig.get("rabbitUserName"));
        connectionFactory.setPassword(EnvConfig.get("rabbitPwd"));
        connectionFactory.setVirtualHost("/");
        if (EnvUtil.isLan()) {
            connectionFactory.setHost(EnvConfig.get("rabbitLanHost"));
        } else {
            connectionFactory.setHost(EnvConfig.get("rabbitInternetHost"));
        }
        connectionFactory.setPort(EnvConfig.getIntValue("rabbitPort"));
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(5000);
        try {
            this.conn = connectionFactory.newConnection();
            this.defaultGlobalPublisherChannel = this.conn.createChannel();
            this.defaultGlobalPublisherChannel.exchangeDeclare("xian-global-exchange", "direct", true);
        } catch (IOException | TimeoutException e) {
            throw new Exception("初始化rabbit连接失败", e);
        }
    }

    private boolean basicConsume(String str, final Function<JSONObject, Boolean> function, boolean z) {
        try {
            initIfNotInitialized();
            final Channel channel = (Channel) this.consumerChannels.getUnchecked(str);
            synchronized (channel) {
                try {
                    createQueueIfNotCreated(str, z);
                    channel.basicConsume(str, false, str, new DefaultConsumer(channel) { // from class: info.xiancloud.plugin.rabbitmq.RabbitMqClient.2
                        public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                            RabbitMqClient.this.ackNoException(channel, ((Boolean) function.apply((JSONObject) JSON.parse(bArr, new Feature[0]))).booleanValue(), envelope.getDeliveryTag());
                        }
                    });
                } catch (Throwable th) {
                    LOG.error("订阅队列失败：" + str, th);
                    return false;
                }
            }
            return true;
        } catch (Exception e) {
            LOG.error(e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean ackNoException(Channel channel, boolean z, long j) {
        try {
            if (z) {
                channel.basicAck(j, true);
                return true;
            }
            channel.basicNack(j, false, true);
            return true;
        } catch (Throwable th) {
            LOG.error(th);
            return false;
        }
    }

    public boolean consumeStaticQueue(String str, Function<JSONObject, Boolean> function) {
        return basicConsume(str, function, true);
    }

    public boolean consumeNonStaticQueue(String str, Function<JSONObject, Boolean> function) {
        return basicConsume(str, function, false);
    }

    public boolean unconsume(String str) {
        Channel channel = (Channel) this.consumerChannels.getIfPresent(str);
        if (channel == null) {
            LOG.error(new Throwable("rabbitmq channel未初始化，不执行取订动作：" + str));
            return false;
        }
        synchronized (channel) {
            try {
                channel.basicCancel(str);
                this.consumerChannels.invalidate(str);
                channel.close();
            } catch (IOException | TimeoutException e) {
                LOG.error("取消订阅失败：" + str, e);
                return false;
            }
        }
        return true;
    }

    public boolean pullStaticQueue(String str, Function<JSONObject, Boolean> function) {
        return basicGet(str, function, true);
    }

    private boolean basicGet(String str, Function<JSONObject, Boolean> function, boolean z) {
        try {
            initIfNotInitialized();
            Channel channel = (Channel) this.consumerChannels.getUnchecked(str);
            synchronized (channel) {
                try {
                    createQueueIfNotCreated(str, z);
                    GetResponse basicGet = channel.basicGet(str, false);
                    if (basicGet == null) {
                        Thread.sleep(1000L);
                    } else {
                        ackNoException(channel, function.apply((JSONObject) JSON.parse(basicGet.getBody(), new Feature[0])).booleanValue(), basicGet.getEnvelope().getDeliveryTag());
                    }
                } catch (Throwable th) {
                    LOG.error("拉取队列消息失败：" + str, th);
                    return false;
                }
            }
            return true;
        } catch (Exception e) {
            LOG.error(e);
            return false;
        }
    }

    public boolean pullNonStaticQueue(String str, Function<JSONObject, Boolean> function) {
        return basicGet(str, function, false);
    }
}
