package reactor.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:reactor/rabbitmq/Sender.class */
public class Sender implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new ChannelCreationFunction();
    private final Mono<Connection> connectionMono;
    private final AtomicBoolean hasConnection;
    private final Mono<Channel> channelMono;
    private final Scheduler resourceCreationScheduler;
    private final boolean privateResourceCreationScheduler;
    private final Scheduler connectionSubscriptionScheduler;
    private final boolean privateConnectionSubscriptionScheduler;

    /* loaded from: input_file:reactor/rabbitmq/Sender$ChannelCreationFunction.class */
    private static class ChannelCreationFunction implements Function<Connection, Channel> {
        private ChannelCreationFunction() {
        }

        @Override // java.util.function.Function
        public Channel apply(Connection connection) {
            try {
                return connection.createChannel();
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error while creating channel", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/rabbitmq/Sender$PublishConfirmOperator.class */
    public static class PublishConfirmOperator extends FluxOperator<OutboundMessage, OutboundMessageResult> {
        private final Channel channel;

        public PublishConfirmOperator(Publisher<OutboundMessage> publisher, Channel channel) {
            super(Flux.from(publisher));
            this.channel = channel;
        }

        public void subscribe(CoreSubscriber<? super OutboundMessageResult> coreSubscriber) {
            this.source.subscribe(new PublishConfirmSubscriber(this.channel, coreSubscriber));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/rabbitmq/Sender$PublishConfirmSubscriber.class */
    public static class PublishConfirmSubscriber implements CoreSubscriber<OutboundMessage> {
        private final AtomicReference<SubscriberState> state;
        private final AtomicReference<Throwable> firstException;
        private final ConcurrentNavigableMap<Long, OutboundMessage> unconfirmed;
        private final Channel channel;
        private final Subscriber<? super OutboundMessageResult> subscriber;

        private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMessageResult> subscriber) {
            this.state = new AtomicReference<>(SubscriberState.INIT);
            this.firstException = new AtomicReference<>();
            this.unconfirmed = new ConcurrentSkipListMap();
            this.channel = channel;
            this.subscriber = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            this.channel.addConfirmListener(new ConfirmListener() { // from class: reactor.rabbitmq.Sender.PublishConfirmSubscriber.1
                public void handleAck(long j, boolean z) throws IOException {
                    handleAckNack(j, z, true);
                }

                public void handleNack(long j, boolean z) throws IOException {
                    handleAckNack(j, z, false);
                }

                private void handleAckNack(long j, boolean z, boolean z2) {
                    if (z) {
                        try {
                            Iterator it = PublishConfirmSubscriber.this.unconfirmed.headMap((ConcurrentNavigableMap) Long.valueOf(j), true).entrySet().iterator();
                            while (it.hasNext()) {
                                PublishConfirmSubscriber.this.subscriber.onNext(new OutboundMessageResult((OutboundMessage) ((Map.Entry) it.next()).getValue(), z2));
                                it.remove();
                            }
                        } catch (Exception e) {
                            PublishConfirmSubscriber.this.handleError(e, null);
                        }
                    } else {
                        OutboundMessage outboundMessage = (OutboundMessage) PublishConfirmSubscriber.this.unconfirmed.get(Long.valueOf(j));
                        try {
                            PublishConfirmSubscriber.this.unconfirmed.remove(Long.valueOf(j));
                            PublishConfirmSubscriber.this.subscriber.onNext(new OutboundMessageResult(outboundMessage, z2));
                        } catch (Exception e2) {
                            PublishConfirmSubscriber.this.handleError(e2, new OutboundMessageResult(outboundMessage, z2));
                        }
                    }
                    if (PublishConfirmSubscriber.this.unconfirmed.size() == 0) {
                        new Thread(() -> {
                            PublishConfirmSubscriber.this.maybeComplete();
                        }).start();
                    }
                }
            });
            this.state.set(SubscriberState.ACTIVE);
            this.subscriber.onSubscribe(subscription);
        }

        public void onNext(OutboundMessage outboundMessage) {
            if (checkComplete(outboundMessage)) {
                return;
            }
            long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
            try {
                this.unconfirmed.putIfAbsent(Long.valueOf(nextPublishSeqNo), outboundMessage);
                this.channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), outboundMessage.getProperties(), outboundMessage.getBody());
            } catch (Exception e) {
                this.unconfirmed.remove(Long.valueOf(nextPublishSeqNo));
                handleError(e, new OutboundMessageResult(outboundMessage, false));
            }
        }

        public void onError(Throwable th) {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) || this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                closeResources();
                this.subscriber.onError(th);
            } else if (this.firstException.compareAndSet(null, th) && this.state.get() == SubscriberState.COMPLETE) {
                Operators.onErrorDropped(th, currentContext());
            }
        }

        public void onComplete() {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.OUTBOUND_DONE) && this.unconfirmed.size() == 0) {
                maybeComplete();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleError(Exception exc, OutboundMessageResult outboundMessageResult) {
            Sender.LOGGER.error("error in publish confirm sending", exc);
            boolean checkComplete = checkComplete(exc);
            this.firstException.compareAndSet(null, exc);
            if (checkComplete) {
                return;
            }
            if (outboundMessageResult != null) {
                this.subscriber.onNext(outboundMessageResult);
            }
            onError(exc);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeComplete() {
            if (this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                closeResources();
                this.subscriber.onComplete();
            }
        }

        private void closeResources() {
            try {
                if (this.channel.isOpen()) {
                    this.channel.close();
                }
            } catch (Exception e) {
            }
        }

        public <T> boolean checkComplete(T t) {
            boolean z = this.state.get() == SubscriberState.COMPLETE;
            if (z && this.firstException.get() == null) {
                Operators.onNextDropped(t, currentContext());
            }
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/rabbitmq/Sender$SubscriberState.class */
    public enum SubscriberState {
        INIT,
        ACTIVE,
        OUTBOUND_DONE,
        COMPLETE
    }

    public Sender() {
        this(new SenderOptions());
    }

    public Sender(SenderOptions senderOptions) {
        this.hasConnection = new AtomicBoolean(false);
        this.privateConnectionSubscriptionScheduler = senderOptions.getConnectionSubscriptionScheduler() == null;
        this.connectionSubscriptionScheduler = senderOptions.getConnectionSubscriptionScheduler() == null ? createScheduler("rabbitmq-sender-conn-sub") : senderOptions.getConnectionSubscriptionScheduler();
        this.connectionMono = Mono.fromCallable(() -> {
            return senderOptions.getConnectionFactory().newConnection();
        }).doOnSubscribe(subscription -> {
            this.hasConnection.set(true);
        }).subscribeOn(this.connectionSubscriptionScheduler).cache();
        this.privateResourceCreationScheduler = senderOptions.getResourceCreationScheduler() == null;
        this.resourceCreationScheduler = senderOptions.getResourceCreationScheduler() == null ? createScheduler("rabbitmq-sender-res-creat") : senderOptions.getResourceCreationScheduler();
        this.channelMono = this.connectionMono.map(CHANNEL_CREATION_FUNCTION).cache();
    }

    protected Scheduler createScheduler(String str) {
        return Schedulers.newElastic(str);
    }

    public Mono<Void> send(Publisher<OutboundMessage> publisher) {
        return this.connectionMono.map(CHANNEL_CREATION_FUNCTION).cache().flatMapMany(channel -> {
            return Flux.from(publisher).doOnNext(outboundMessage -> {
                try {
                    channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), outboundMessage.getProperties(), outboundMessage.getBody());
                } catch (IOException e) {
                    LOGGER.warn("Error when publishing message: {}", e.getMessage());
                }
            }).doOnError(th -> {
                LOGGER.warn("Send failed with exception {}", th);
            }).doFinally(signalType -> {
                int channelNumber = channel.getChannelNumber();
                LOGGER.info("closing channel {} by signal {}", Integer.valueOf(channelNumber), signalType);
                try {
                    if (channel.isOpen() && channel.getConnection().isOpen()) {
                        channel.close();
                    }
                } catch (IOException | TimeoutException e) {
                    LOGGER.warn("Channel {} didn't close normally: {}", Integer.valueOf(channelNumber), e.getMessage());
                }
            });
        }).then();
    }

    public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> publisher) {
        return this.connectionMono.map(CHANNEL_CREATION_FUNCTION).map(channel -> {
            try {
                channel.confirmSelect();
                return channel;
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error while setting publisher confirms on channel", e);
            }
        }).flatMapMany(channel2 -> {
            return new PublishConfirmOperator(publisher, channel2);
        });
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queueSpecification) {
        return declareQueue(queueSpecification);
    }

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification queueSpecification) {
        AMQP.Queue.Declare build = queueSpecification.getName() == null ? new AMQP.Queue.Declare.Builder().queue("").durable(false).exclusive(true).autoDelete(true).arguments((Map) null).build() : new AMQP.Queue.Declare.Builder().queue(queueSpecification.getName()).durable(queueSpecification.isDurable()).exclusive(queueSpecification.isExclusive()).autoDelete(queueSpecification.isAutoDelete()).arguments(queueSpecification.getArguments()).build();
        return this.channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceCreationScheduler);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification queueSpecification) {
        return delete(queueSpecification, false, false);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification queueSpecification, boolean z, boolean z2) {
        return deleteQueue(queueSpecification, z, z2);
    }

    public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification queueSpecification, boolean z, boolean z2) {
        AMQP.Queue.Delete build = new AMQP.Queue.Delete.Builder().queue(queueSpecification.getName()).ifUnused(z).ifEmpty(z2).build();
        return this.channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceCreationScheduler);
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchangeSpecification) {
        return declareExchange(exchangeSpecification);
    }

    public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification exchangeSpecification) {
        AMQP.Exchange.Declare build = new AMQP.Exchange.Declare.Builder().exchange(exchangeSpecification.getName()).type(exchangeSpecification.getType()).durable(exchangeSpecification.isDurable()).autoDelete(exchangeSpecification.isAutoDelete()).internal(exchangeSpecification.isInternal()).arguments(exchangeSpecification.getArguments()).build();
        return this.channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceCreationScheduler);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification exchangeSpecification) {
        return delete(exchangeSpecification, false);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification exchangeSpecification, boolean z) {
        return deleteExchange(exchangeSpecification, z);
    }

    public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification exchangeSpecification, boolean z) {
        AMQP.Exchange.Delete build = new AMQP.Exchange.Delete.Builder().exchange(exchangeSpecification.getName()).ifUnused(z).build();
        return this.channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceCreationScheduler);
    }

    public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification bindingSpecification) {
        AMQP.Queue.Unbind build = new AMQP.Queue.Unbind.Builder().exchange(bindingSpecification.getExchange()).queue(bindingSpecification.getQueue()).routingKey(bindingSpecification.getRoutingKey()).arguments(bindingSpecification.getArguments()).build();
        return this.channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceCreationScheduler);
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification bindingSpecification) {
        AMQP.Queue.Bind build = new AMQP.Queue.Bind.Builder().exchange(bindingSpecification.getExchange()).queue(bindingSpecification.getQueue()).routingKey(bindingSpecification.getRoutingKey()).arguments(bindingSpecification.getArguments()).build();
        return this.channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new ReactorRabbitMqException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceCreationScheduler);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.hasConnection.getAndSet(false)) {
            try {
                ((Connection) this.connectionMono.block()).close();
            } catch (IOException e) {
                throw new ReactorRabbitMqException(e);
            }
        }
        if (this.privateConnectionSubscriptionScheduler) {
            this.connectionSubscriptionScheduler.dispose();
        }
        if (this.privateResourceCreationScheduler) {
            this.resourceCreationScheduler.dispose();
        }
    }
}
