package net.syberia.storm.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/syberia/storm/rabbitmq/RabbitMqBolt.class */
public class RabbitMqBolt extends BaseRichBolt {
    private static final long serialVersionUID = 377563808237437264L;
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqBolt.class);
    private final TupleToRabbitMqMessageConverter tupleToRabbitMqMessageConverter;
    private RabbitMqChannelProvider rabbitMqChannelProvider;
    private OutputCollector collector;

    public RabbitMqBolt(TupleToRabbitMqMessageConverter tupleToRabbitMqMessageConverter) {
        this(null, tupleToRabbitMqMessageConverter);
    }

    public RabbitMqBolt(RabbitMqChannelProvider rabbitMqChannelProvider, TupleToRabbitMqMessageConverter tupleToRabbitMqMessageConverter) {
        this.rabbitMqChannelProvider = rabbitMqChannelProvider;
        this.tupleToRabbitMqMessageConverter = tupleToRabbitMqMessageConverter;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.tupleToRabbitMqMessageConverter.prepare(map, topologyContext);
        if (this.rabbitMqChannelProvider == null) {
            this.rabbitMqChannelProvider = RabbitMqChannelProvider.withStormConfig(map);
        }
        try {
            this.rabbitMqChannelProvider.prepare();
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException("Unable to prepare RabbitMQ channel provider", e);
        }
    }

    public void execute(Tuple tuple) {
        try {
            String exchange = this.tupleToRabbitMqMessageConverter.getExchange(tuple);
            String routingKey = this.tupleToRabbitMqMessageConverter.getRoutingKey(tuple);
            AMQP.BasicProperties properties = this.tupleToRabbitMqMessageConverter.getProperties(tuple);
            byte[] messageBody = this.tupleToRabbitMqMessageConverter.getMessageBody(tuple);
            try {
                Channel channel = this.rabbitMqChannelProvider.getChannel();
                try {
                    try {
                        channel.basicPublish(exchange, routingKey, properties, messageBody);
                        this.rabbitMqChannelProvider.returnChannel(channel);
                        this.collector.ack(tuple);
                    } catch (IOException e) {
                        LOGGER.error("Unable to publish RabbitMQ message", e);
                        this.collector.reportError(e);
                        this.collector.fail(tuple);
                        this.rabbitMqChannelProvider.returnChannel(channel);
                    }
                } catch (Throwable th) {
                    this.rabbitMqChannelProvider.returnChannel(channel);
                    throw th;
                }
            } catch (Exception e2) {
                LOGGER.error("Unable to get RabbitMQ channel from the provider", e2);
                this.collector.reportError(e2);
                this.collector.fail(tuple);
            }
        } catch (Exception e3) {
            LOGGER.error("Unable to convert tuple to RabbitMQ message", e3);
            this.collector.reportError(e3);
            this.collector.fail(tuple);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void cleanup() {
        try {
            this.rabbitMqChannelProvider.cleanup();
        } catch (Exception e) {
            LOGGER.error("Unable to cleanup RabbitMQ provider", e);
        }
        this.tupleToRabbitMqMessageConverter.cleanup();
    }
}
