/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.jms;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsLogging;
import io.smallrye.reactive.messaging.json.JsonMapping;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Destination;
import javax.jms.IllegalStateRuntimeException;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.Topic;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class JmsSource {
    private final PublisherBuilder<IncomingJmsMessage<?>> source;
    private final JmsPublisher publisher;

    JmsSource(JMSContext context, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping, Executor executor) {
        JMSConsumer consumer;
        String name = config.getDestination().orElseGet(config::getChannel);
        String selector = config.getSelector().orElse(null);
        boolean nolocal = config.getNoLocal();
        boolean broadcast = config.getBroadcast();
        boolean durable = config.getDurable();
        Destination destination = this.getDestination(context, name, config);
        if (durable) {
            if (!(destination instanceof Topic)) {
                throw JmsExceptions.ex.illegalArgumentInvalidDestination();
            }
            consumer = context.createDurableConsumer((Topic)destination, name, selector, nolocal);
        } else {
            consumer = context.createConsumer(destination, selector, nolocal);
        }
        this.publisher = new JmsPublisher(consumer);
        this.source = !broadcast ? ReactiveStreams.fromPublisher((Publisher)this.publisher).map(m -> new IncomingJmsMessage((Message)m, executor, jsonMapping)) : ReactiveStreams.fromPublisher((Publisher)Multi.createFrom().publisher((Publisher)this.publisher).map(m -> new IncomingJmsMessage((Message)m, executor, jsonMapping)).broadcast().toAllSubscribers());
    }

    void close() {
        this.publisher.close();
    }

    private Destination getDestination(JMSContext context, String name, JmsConnectorIncomingConfiguration config) {
        String type = config.getDestinationType();
        switch (type.toLowerCase()) {
            case "queue": {
                JmsLogging.log.creatingQueue(name);
                return context.createQueue(name);
            }
            case "topic": {
                JmsLogging.log.creatingTopic(name);
                return context.createTopic(name);
            }
        }
        throw JmsExceptions.ex.illegalArgumentUnknownDestinationType(type);
    }

    PublisherBuilder<IncomingJmsMessage<?>> getSource() {
        return this.source;
    }

    private static class JmsPublisher
    implements Publisher<Message>,
    Subscription {
        private final AtomicLong requests = new AtomicLong();
        private final AtomicReference<Subscriber<? super Message>> downstream = new AtomicReference();
        private final JMSConsumer consumer;
        private final ExecutorService executor;
        private boolean unbounded;

        private JmsPublisher(JMSConsumer consumer) {
            this.consumer = consumer;
            this.executor = Executors.newSingleThreadExecutor();
        }

        void close() {
            Subscriber subscriber = this.downstream.getAndSet(null);
            if (subscriber != null) {
                subscriber.onComplete();
            }
            this.consumer.close();
            this.executor.shutdown();
        }

        public void subscribe(Subscriber<? super Message> s) {
            if (this.downstream.compareAndSet(null, s)) {
                s.onSubscribe((Subscription)this);
            } else {
                Subscriptions.fail(s, (Throwable)JmsExceptions.ex.illegalStateAlreadySubscriber());
            }
        }

        public void request(long n) {
            boolean u;
            if (n > 0L && !(u = this.unbounded)) {
                long v = this.add(n);
                if (v == Long.MAX_VALUE) {
                    this.unbounded = true;
                    this.startUnboundedReception();
                } else {
                    this.enqueue(n);
                }
            }
        }

        private void enqueue(long n) {
            int i = 0;
            while ((long)i < n) {
                this.executor.execute(() -> {
                    try {
                        Message message = this.consumer.receive();
                        if (message != null) {
                            this.requests.decrementAndGet();
                            this.downstream.get().onNext((Object)message);
                        }
                    }
                    catch (IllegalStateRuntimeException e) {
                        JmsLogging.log.clientClosed();
                    }
                });
                ++i;
            }
        }

        private void startUnboundedReception() {
            this.consumer.setMessageListener(m -> this.downstream.get().onNext((Object)m));
        }

        public void cancel() {
            this.close();
        }

        long add(long req) {
            long u;
            long v;
            long r;
            do {
                if ((r = this.requests.get()) != Long.MAX_VALUE) continue;
                return Long.MAX_VALUE;
            } while (!this.requests.compareAndSet(r, v = (u = r + req) < 0L ? Long.MAX_VALUE : u));
            return v;
        }
    }
}

