package rapture.exchange.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.FlowListener;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import rapture.common.RapturePipelineTask;
import rapture.common.exception.ExceptionToString;
import rapture.common.exception.RaptureException;
import rapture.common.exception.RaptureExceptionFactory;
import rapture.common.impl.jackson.JacksonUtil;
import rapture.common.model.RaptureExchange;
import rapture.common.model.RaptureExchangeQueue;
import rapture.config.MultiValueConfigLoader;
import rapture.exchange.ExchangeHandler;
import rapture.exchange.QueueHandler;
import rapture.exchange.RPCMessage;
import rapture.exchange.TopicMessageHandler;
import rapture.util.IDGenerator;

/* loaded from: input_file:rapture/exchange/rabbitmq/RabbitExchangeHandler.class */
public class RabbitExchangeHandler implements ExchangeHandler {
    private Connection connection;
    private Channel channel;
    private static Logger logger = Logger.getLogger(RabbitExchangeHandler.class);
    private String replyQueueName;
    private QueueingConsumer consumer;
    private ExecutorService service = Executors.newCachedThreadPool();
    private String instanceName = "default";
    private int messageCounter = 1;
    private Map<String, String> queueNameRegistry = new ConcurrentHashMap();
    private Set<String> exchangesTested = Collections.synchronizedSet(new HashSet());
    private AtomicLong subscriptionHandler = new AtomicLong(0);
    private Map<Long, SubscriptionThread> subscriberMap = new HashMap();

    /* loaded from: input_file:rapture/exchange/rabbitmq/RabbitExchangeHandler$Publisher.class */
    private class Publisher implements Runnable {
        final String exchange;
        final String topic;
        final String message;

        public Publisher(String str, String str2, String str3) {
            this.exchange = str;
            this.topic = str2;
            this.message = str3;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                RabbitExchangeHandler.this.channel.basicPublish(this.exchange, this.topic, (AMQP.BasicProperties) null, this.message.getBytes());
            } catch (IOException e) {
                throw RaptureExceptionFactory.create("Could not publish on topic queue", e);
            }
        }
    }

    /* loaded from: input_file:rapture/exchange/rabbitmq/RabbitExchangeHandler$Submitter.class */
    private class Submitter implements Runnable {
        final String exchange;
        final RapturePipelineTask task;
        final String routingKey;
        private RaptureException error = null;

        public Submitter(String str, RapturePipelineTask rapturePipelineTask, String str2) {
            this.exchange = str;
            this.task = rapturePipelineTask;
            this.routingKey = str2;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                byte[] bytesJsonFromObject = JacksonUtil.bytesJsonFromObject(this.task);
                RabbitExchangeHandler.access$108(RabbitExchangeHandler.this);
                AMQP.BasicProperties build = new AMQP.BasicProperties().builder().contentType(this.task.getContentType()).deliveryMode(Integer.valueOf(Integer.parseInt(MultiValueConfigLoader.getConfig("RABBITMQ-deliveryMode", "1")))).priority(Integer.valueOf(this.task.getPriority())).messageId("" + RabbitExchangeHandler.this.messageCounter).build();
                RabbitExchangeHandler.logger.debug(String.format(Messages.getString("RabbitExchangeHandler.publishMessage"), this.exchange, this.routingKey, this.task.getContentType()));
                try {
                    RabbitExchangeHandler.this.channel.basicPublish(this.exchange, this.routingKey, build, bytesJsonFromObject);
                    RabbitExchangeHandler.logger.debug(String.format(Messages.getString("RabbitExchangeHandler.sendMessage"), build.getMessageId(), this.task.getContent(), this.exchange, this.routingKey));
                } catch (IOException e) {
                    throw RaptureExceptionFactory.create(500, Messages.getString("RabbitExchangeHandler.noPublish"), e);
                }
            } catch (RaptureException e2) {
                this.error = e2;
            }
        }

        public RaptureException getError() {
            return this.error;
        }
    }

    public void setInstanceName(String str) {
        this.instanceName = str;
    }

    public synchronized void setConfig(Map<String, String> map) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        logger.info(Messages.getString("RabbitExchangeHandler.config"));
        try {
            String config = MultiValueConfigLoader.getConfig("RABBITMQ-" + this.instanceName);
            if (config == null || config.isEmpty()) {
                config = "amqp://guest:guest@localhost:5672/%2f";
            }
            connectionFactory.setUri(config);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            logger.debug(Messages.getString("RabbitExchangeHandler.creatingChannel"));
            this.connection = connectionFactory.newConnection();
            this.connection.addShutdownListener(new ShutdownListener() { // from class: rapture.exchange.rabbitmq.RabbitExchangeHandler.1
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                    RabbitExchangeHandler.logger.info("Reconnected to RabbitMQ");
                }
            });
            logger.debug(Messages.getString("RabbitExchangeHandler.connectionMade"));
            this.channel = this.connection.createChannel();
            this.channel.addShutdownListener(new ShutdownListener() { // from class: rapture.exchange.rabbitmq.RabbitExchangeHandler.2
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                    RabbitExchangeHandler.logger.info("Disconnected from RabbitMQ. Cause :" + shutdownSignalException.getMessage());
                    RabbitExchangeHandler.logger.debug(ExceptionToString.format(shutdownSignalException));
                }
            });
            logger.debug(Messages.getString("RabbitExchangeHandler.channelCreated"));
            this.channel.basicQos(100);
            this.channel.addReturnListener(new ReturnListener() { // from class: rapture.exchange.rabbitmq.RabbitExchangeHandler.3
                public void handleReturn(int i, String str, String str2, String str3, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    RabbitExchangeHandler.logger.debug(String.format(Messages.getString("RabbitExchangeHandler.returnListener"), Integer.valueOf(i), str));
                }
            });
            this.channel.addFlowListener(new FlowListener() { // from class: rapture.exchange.rabbitmq.RabbitExchangeHandler.4
                public void handleFlow(boolean z) throws IOException {
                    RabbitExchangeHandler.logger.debug(String.format(Messages.getString("RabbitExchangeHandler.Flow"), Boolean.valueOf(z)));
                }
            });
            this.replyQueueName = this.channel.queueDeclare().getQueue();
            logger.info("RPC reply queue is " + this.replyQueueName);
            this.consumer = new QueueingConsumer(this.channel);
            this.channel.basicConsume(this.replyQueueName, true, this.consumer);
        } catch (Exception e) {
            throw RaptureExceptionFactory.create(500, Messages.getString("RabbitExchangeHandler.noConnect"), e);
        }
    }

    public void setupExchange(RaptureExchange raptureExchange) {
        if (this.exchangesTested.contains(raptureExchange.getName())) {
            if (raptureExchange.getQueueBindings() != null) {
                for (RaptureExchangeQueue raptureExchangeQueue : raptureExchange.getQueueBindings()) {
                    try {
                        bindQueue(raptureExchange, raptureExchangeQueue);
                    } catch (IOException e) {
                        logger.error("Unable to bind " + raptureExchangeQueue.getName() + " to exchange " + raptureExchange.getName());
                        logger.info(ExceptionToString.format(e));
                    }
                }
                return;
            }
            return;
        }
        logger.debug(String.format(Messages.getString("RabbitExchangeHandler.ensureAvail"), raptureExchange.getName()));
        try {
            logger.debug(String.format(Messages.getString("RabbitExchangeHandler.ExchangeDeclare"), raptureExchange.getName()));
            logger.debug(String.format(Messages.getString("RabbitExchangeHandler.exchangeType"), raptureExchange.getExchangeType()));
            this.channel.exchangeDeclare(raptureExchange.getName(), raptureExchange.getExchangeType().name().toLowerCase(), true);
            logger.debug(Messages.getString("RabbitExchangeHandler.bindingQueues"));
            if (raptureExchange.getQueueBindings() != null) {
                Iterator it = raptureExchange.getQueueBindings().iterator();
                while (it.hasNext()) {
                    bindQueue(raptureExchange, (RaptureExchangeQueue) it.next());
                }
            }
            this.exchangesTested.add(raptureExchange.getName());
        } catch (IOException e2) {
            String string = Messages.getString("RabbitExchangeHandler.noExchange");
            try {
                if (this.channel.isOpen()) {
                    this.channel.close();
                }
            } catch (IOException e3) {
                logger.error(ExceptionToString.format(e3));
            }
            throw RaptureExceptionFactory.create(500, string, e2);
        } catch (Throwable th) {
            try {
                if (this.channel.isOpen()) {
                    this.channel.close();
                }
            } catch (IOException e4) {
                logger.error(ExceptionToString.format(e4));
            }
            String format = String.format("Caught throwable during exchangeDeclare for exchange %s:\n%s", raptureExchange.getName(), ExceptionToString.format(th));
            logger.info(format);
            throw RaptureExceptionFactory.create(500, format, th);
        }
    }

    private void bindQueue(RaptureExchange raptureExchange, RaptureExchangeQueue raptureExchangeQueue) throws IOException {
        String underlyingQueueName;
        if (!isAnonymousQueue(raptureExchangeQueue.getName())) {
            underlyingQueueName = getUnderlyingQueueName(raptureExchange.getName(), raptureExchangeQueue.getName());
            logger.debug(String.format(Messages.getString("RabbitExchangeHandler.underlyingQueue"), underlyingQueueName));
            this.channel.queueDeclare(underlyingQueueName, true, false, false, (Map) null);
        } else if (this.queueNameRegistry.containsKey(raptureExchangeQueue.getName())) {
            underlyingQueueName = this.queueNameRegistry.get(raptureExchangeQueue.getName());
        } else {
            underlyingQueueName = this.channel.queueDeclare().getQueue();
            this.queueNameRegistry.put(raptureExchangeQueue.getName(), underlyingQueueName);
        }
        if (raptureExchangeQueue.getRouteBindings().isEmpty()) {
            logger.debug(String.format(Messages.getString("RabbitExchangeHandler.noRoute"), raptureExchange.getName()));
            this.channel.queueBind(underlyingQueueName, raptureExchange.getName(), "");
            return;
        }
        for (String str : raptureExchangeQueue.getRouteBindings()) {
            logger.debug(String.format(Messages.getString("RabbitExchangeHandler.binding"), str, raptureExchange.getName()));
            this.channel.queueBind(underlyingQueueName, raptureExchange.getName(), str);
        }
    }

    private boolean isAnonymousQueue(String str) {
        return str != null && str.startsWith("$anonoymous");
    }

    public String startConsuming(String str, String str2, QueueHandler queueHandler) {
        String underlyingQueueName;
        boolean booleanValue = Boolean.valueOf(MultiValueConfigLoader.getConfig("RABBITMQ-autoAck", "false")).booleanValue();
        String uuid = IDGenerator.getUUID();
        try {
            if (!isAnonymousQueue(str2)) {
                underlyingQueueName = getUnderlyingQueueName(str, str2);
            } else {
                if (!this.queueNameRegistry.containsKey(str2)) {
                    throw RaptureExceptionFactory.create(String.format("Error! Cannot start consuming on undefined anonymous queue %s, on exchange %s", str2, str));
                }
                underlyingQueueName = this.queueNameRegistry.get(str2);
            }
            logger.debug(String.format(Messages.getString("RabbitExchangeHandler.startConsuming"), underlyingQueueName));
            logger.debug(String.format(Messages.getString("RabbitExchangeHandler.underly"), underlyingQueueName));
            this.channel.basicConsume(underlyingQueueName, booleanValue, uuid, new MessageConsumer(this.channel, uuid, queueHandler, underlyingQueueName));
            return uuid;
        } catch (IOException e) {
            String format = String.format(Messages.getString("RabbitExchangeHandler.noStartConsuming"), String.format("%s (%s)", str2, str));
            try {
                this.channel.close();
            } catch (IOException e2) {
                logger.error(ExceptionToString.format(e2));
            }
            throw RaptureExceptionFactory.create(500, format, e);
        }
    }

    private String getUnderlyingQueueName(String str, String str2) {
        return str + "-" + str2;
    }

    public void tearDownExchange(RaptureExchange raptureExchange) {
        try {
            Iterator it = raptureExchange.getQueueBindings().iterator();
            while (it.hasNext()) {
                unbindQueue(raptureExchange, (RaptureExchangeQueue) it.next());
            }
            this.channel.exchangeDelete(raptureExchange.getName());
            this.exchangesTested.remove(raptureExchange.getName());
        } catch (IOException e) {
            throw RaptureExceptionFactory.create(500, String.format(Messages.getString("RabbitExchangeHandler.noDelete"), new Object[0]), e);
        }
    }

    private void unbindQueue(RaptureExchange raptureExchange, RaptureExchangeQueue raptureExchangeQueue) throws IOException {
        String underlyingQueueName = getUnderlyingQueueName(raptureExchange.getName(), raptureExchangeQueue.getName());
        Iterator it = raptureExchangeQueue.getRouteBindings().iterator();
        while (it.hasNext()) {
            this.channel.queueUnbind(underlyingQueueName, raptureExchange.getName(), (String) it.next());
        }
        this.channel.queueDelete(underlyingQueueName);
    }

    public void putTaskOnExchange(String str, RapturePipelineTask rapturePipelineTask, String str2) {
        logger.debug(Messages.getString("RabbitExchangeHandler.puttingTask"));
        Submitter submitter = new Submitter(str, rapturePipelineTask, str2);
        this.service.submit(submitter);
        RaptureException error = submitter.getError();
        if (error != null) {
            throw error;
        }
    }

    public String subscribeToExchange(String str, List<String> list, QueueHandler queueHandler) {
        throw RaptureExceptionFactory.create(500, Messages.getString("RabbitExchangeHandler.notYetSupported"));
    }

    public void ensureExchangeUnAvailable(RaptureExchange raptureExchange) {
        this.exchangesTested.remove(raptureExchange.getName());
    }

    public Map<String, Object> makeRPC(String str, String str2, Map<String, Object> map, long j) {
        QueueingConsumer.Delivery nextDelivery;
        try {
            String uuid = UUID.randomUUID().toString();
            AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().correlationId(uuid).replyTo(this.replyQueueName).build();
            RPCMessage rPCMessage = new RPCMessage();
            rPCMessage.setFnName(str2);
            rPCMessage.setParams(map);
            String jsonFromObject = JacksonUtil.jsonFromObject(rPCMessage);
            logger.debug("Will make call on queue name " + str);
            logger.debug("Message is " + jsonFromObject);
            this.channel.basicPublish("", str, build, jsonFromObject.getBytes("UTF-8"));
            do {
                nextDelivery = this.consumer.nextDelivery(j * 1000);
                if (nextDelivery == null) {
                    throw RaptureExceptionFactory.create("Timed out while waiting for response");
                }
            } while (!nextDelivery.getProperties().getCorrelationId().equals(uuid));
            String str3 = new String(nextDelivery.getBody());
            Map<String, Object> map2 = null;
            if (str3 != null) {
                map2 = JacksonUtil.getMapFromJson(str3);
            }
            return map2;
        } catch (InterruptedException e) {
            throw RaptureExceptionFactory.create("Interrupted", e);
        } catch (ConsumerCancelledException e2) {
            throw RaptureExceptionFactory.create("Cancelled", e2);
        } catch (IOException e3) {
            throw RaptureExceptionFactory.create("Could not create reply rpc queue", e3);
        } catch (ShutdownSignalException e4) {
            throw RaptureExceptionFactory.create("Shutdown", e4);
        }
    }

    public void publishTopicMessage(String str, String str2, String str3) {
        this.service.submit(new Publisher(str, str2, str3));
    }

    public long subscribeTopic(String str, String str2, TopicMessageHandler topicMessageHandler) {
        SubscriptionThread subscriptionThread = new SubscriptionThread(str, str2, this.channel, topicMessageHandler);
        long incrementAndGet = this.subscriptionHandler.incrementAndGet();
        this.subscriberMap.put(Long.valueOf(incrementAndGet), subscriptionThread);
        subscriptionThread.start();
        return incrementAndGet;
    }

    public void unsubscribeTopic(long j) {
        SubscriptionThread remove = this.subscriberMap.remove(Long.valueOf(j));
        if (remove != null) {
            remove.closeSubscription();
        }
    }

    static /* synthetic */ int access$108(RabbitExchangeHandler rabbitExchangeHandler) {
        int i = rabbitExchangeHandler.messageCounter;
        rabbitExchangeHandler.messageCounter = i + 1;
        return i;
    }
}
