package org.springframework.integration.kafka.listener;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import reactor.core.processor.RingBufferProcessor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/springframework/integration/kafka/listener/QueueingMessageListenerInvoker.class */
public class QueueingMessageListenerInvoker {
    private final MessageListener messageListener;
    private final AcknowledgingMessageListener acknowledgingMessageListener;
    private final OffsetManager offsetManager;
    private final ErrorHandler errorHandler;
    private final int capacity;
    private final ExecutorService executorService;
    private RingBufferProcessor<KafkaMessage> ringBufferProcessor;
    private volatile boolean running = false;
    private volatile CountDownLatch shutdownLatch;

    /* loaded from: input_file:org/springframework/integration/kafka/listener/QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.class */
    private class KafkaMessageDispatchingSubscriber implements Subscriber<KafkaMessage> {
        private KafkaMessageDispatchingSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            subscription.request(Long.MAX_VALUE);
        }

        public void onNext(KafkaMessage kafkaMessage) {
            try {
                try {
                    if (QueueingMessageListenerInvoker.this.messageListener != null) {
                        QueueingMessageListenerInvoker.this.messageListener.onMessage(kafkaMessage);
                    } else {
                        QueueingMessageListenerInvoker.this.acknowledgingMessageListener.onMessage(kafkaMessage, new DefaultAcknowledgment(QueueingMessageListenerInvoker.this.offsetManager, kafkaMessage));
                    }
                    if (QueueingMessageListenerInvoker.this.messageListener != null) {
                        QueueingMessageListenerInvoker.this.offsetManager.updateOffset(kafkaMessage.getMetadata().getPartition(), kafkaMessage.getMetadata().getNextOffset());
                    }
                } catch (Exception e) {
                    if (QueueingMessageListenerInvoker.this.errorHandler != null) {
                        QueueingMessageListenerInvoker.this.errorHandler.handle(e, kafkaMessage);
                    }
                    if (QueueingMessageListenerInvoker.this.messageListener != null) {
                        QueueingMessageListenerInvoker.this.offsetManager.updateOffset(kafkaMessage.getMetadata().getPartition(), kafkaMessage.getMetadata().getNextOffset());
                    }
                }
            } catch (Throwable th) {
                if (QueueingMessageListenerInvoker.this.messageListener != null) {
                    QueueingMessageListenerInvoker.this.offsetManager.updateOffset(kafkaMessage.getMetadata().getPartition(), kafkaMessage.getMetadata().getNextOffset());
                }
                throw th;
            }
        }

        public void onError(Throwable th) {
        }

        public void onComplete() {
            CountDownLatch countDownLatch = QueueingMessageListenerInvoker.this.shutdownLatch;
            if (countDownLatch != null) {
                QueueingMessageListenerInvoker.this.shutdownLatch = null;
                countDownLatch.countDown();
            }
        }
    }

    public QueueingMessageListenerInvoker(int i, OffsetManager offsetManager, Object obj, ErrorHandler errorHandler, Executor executor) {
        this.capacity = i;
        if (obj instanceof MessageListener) {
            this.messageListener = (MessageListener) obj;
            this.acknowledgingMessageListener = null;
        } else {
            if (!(obj instanceof AcknowledgingMessageListener)) {
                throw new IllegalArgumentException("Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided");
            }
            this.acknowledgingMessageListener = (AcknowledgingMessageListener) obj;
            this.messageListener = null;
        }
        this.offsetManager = offsetManager;
        this.errorHandler = errorHandler;
        if (executor != null) {
            this.executorService = new ExecutorServiceAdapter(new ConcurrentTaskExecutor(executor));
        } else {
            this.executorService = null;
        }
    }

    public void enqueue(KafkaMessage kafkaMessage) {
        if (this.running) {
            this.ringBufferProcessor.onNext(kafkaMessage);
        }
    }

    public void start() {
        this.running = true;
        this.ringBufferProcessor = RingBufferProcessor.share(this.executorService != null ? this.executorService : Executors.newSingleThreadExecutor(), this.capacity);
        this.ringBufferProcessor.subscribe(new KafkaMessageDispatchingSubscriber());
    }

    public void stop(long j) {
        this.running = false;
        if (this.ringBufferProcessor != null) {
            this.ringBufferProcessor.onComplete();
            this.ringBufferProcessor = null;
            this.shutdownLatch = new CountDownLatch(1);
            try {
                this.shutdownLatch.await(j, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
