package reactor.kafka.receiver.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.CommittableBatch;
import reactor.kafka.sender.TransactionManager;

/* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver.class */
public class DefaultKafkaReceiver<K, V> implements KafkaReceiver<K, V>, ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(DefaultKafkaReceiver.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet(Arrays.asList("assignment", "subscription", "seek", "seekToBeginning", "seekToEnd", "position", "committed", "metrics", "partitionsFor", "listTopics", "paused", "pause", "resume", "offsetsForTimes", "beginningOffsets", "endOffsets"));
    private final ConsumerFactory consumerFactory;
    private final ReceiverOptions<K, V> receiverOptions;
    private final Scheduler eventScheduler;
    private AckMode ackMode;
    private AtmostOnceOffsets atmostOnceOffsets;
    private EmitterProcessor<DefaultKafkaReceiver<K, V>.Event<?>> eventEmitter;
    private FluxSink<DefaultKafkaReceiver<K, V>.Event<?>> eventSubmission;
    private EmitterProcessor<ConsumerRecords<K, V>> recordEmitter;
    private FluxSink<ConsumerRecords<K, V>> recordSubmission;
    private DefaultKafkaReceiver<K, V>.InitEvent initEvent;
    private DefaultKafkaReceiver<K, V>.PollEvent pollEvent;
    private DefaultKafkaReceiver<K, V>.CommitEvent commitEvent;
    private Flux<DefaultKafkaReceiver<K, V>.Event<?>> eventFlux;
    private Flux<ConsumerRecords<K, V>> consumerFlux;
    private Consumer<K, V> consumer;
    private Consumer<K, V> consumerProxy;
    private final List<Flux<? extends DefaultKafkaReceiver<K, V>.Event<?>>> fluxList = new ArrayList();
    private final List<Disposable> subscribeDisposables = new ArrayList();
    private final AtomicLong requestsPending = new AtomicLong();
    private final AtomicBoolean needsHeartbeat = new AtomicBoolean();
    private final AtomicInteger consecutiveCommitFailures = new AtomicInteger();
    private final AtomicBoolean isActive = new AtomicBoolean();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final AtomicBoolean awaitingTransaction = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$AckMode.class */
    public enum AckMode {
        AUTO_ACK,
        MANUAL_ACK,
        ATMOST_ONCE,
        EXACTLY_ONCE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$AtmostOnceOffsets.class */
    public static class AtmostOnceOffsets {
        private final Map<TopicPartition, Long> committedOffsets = new ConcurrentHashMap();
        private final Map<TopicPartition, Long> dispatchedOffsets = new ConcurrentHashMap();

        AtmostOnceOffsets() {
        }

        void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
                this.committedOffsets.put(entry.getKey(), Long.valueOf(entry.getValue().offset()));
            }
        }

        void onDispatch(TopicPartition topicPartition, long j) {
            this.dispatchedOffsets.put(topicPartition, Long.valueOf(j));
        }

        long committedOffset(TopicPartition topicPartition) {
            Long l = this.committedOffsets.get(topicPartition);
            if (l == null) {
                return -1L;
            }
            return l.longValue();
        }

        boolean undoCommitAhead(CommittableBatch committableBatch) {
            boolean z = false;
            for (Map.Entry<TopicPartition, Long> entry : this.committedOffsets.entrySet()) {
                TopicPartition key = entry.getKey();
                long longValue = this.dispatchedOffsets.get(entry.getKey()).longValue() + 1;
                if (entry.getValue().longValue() > longValue) {
                    committableBatch.updateOffset(key, longValue);
                    z = true;
                }
            }
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$CloseEvent.class */
    public class CloseEvent extends DefaultKafkaReceiver<K, V>.Event<ConsumerRecords<K, V>> {
        private final long closeEndTimeNanos;
        private Semaphore semaphore;

        CloseEvent(Duration duration) {
            super(EventType.CLOSE);
            this.semaphore = new Semaphore(0);
            this.closeEndTimeNanos = System.nanoTime() + duration.toNanos();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (DefaultKafkaReceiver.this.consumer != null) {
                    Collection<TopicPartition> assignment = DefaultKafkaReceiver.this.receiverOptions.assignment();
                    if (assignment != null && !assignment.isEmpty()) {
                        DefaultKafkaReceiver.this.onPartitionsRevoked(assignment);
                    }
                    for (int i = 0; i < 3; i++) {
                        try {
                            boolean undoCommitAhead = DefaultKafkaReceiver.this.ackMode == AckMode.ATMOST_ONCE ? DefaultKafkaReceiver.this.atmostOnceOffsets.undoCommitAhead(DefaultKafkaReceiver.this.committableBatch()) : true;
                            if (DefaultKafkaReceiver.this.ackMode != AckMode.EXACTLY_ONCE) {
                                DefaultKafkaReceiver.this.commitEvent.runIfRequired(undoCommitAhead);
                                DefaultKafkaReceiver.this.commitEvent.waitFor(this.closeEndTimeNanos);
                            }
                            long nanoTime = this.closeEndTimeNanos - System.nanoTime();
                            if (nanoTime < 0) {
                                nanoTime = 0;
                            }
                            DefaultKafkaReceiver.this.consumer.close(nanoTime, TimeUnit.NANOSECONDS);
                            break;
                        } catch (WakeupException e) {
                            if (i == 3 - 1) {
                                throw e;
                            }
                        }
                    }
                }
                this.semaphore.release();
            } catch (Exception e2) {
                DefaultKafkaReceiver.log.error("Unexpected exception during close", e2);
                DefaultKafkaReceiver.this.fail(e2, false);
            }
        }

        boolean await(long j) throws InterruptedException {
            return this.semaphore.tryAcquire(j, TimeUnit.NANOSECONDS);
        }

        boolean await() {
            boolean z = false;
            while (!z) {
                long nanoTime = this.closeEndTimeNanos - System.nanoTime();
                if (nanoTime <= 0) {
                    break;
                }
                try {
                    z = await(nanoTime);
                } catch (InterruptedException e) {
                }
            }
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$CommitEvent.class */
    public class CommitEvent extends DefaultKafkaReceiver<K, V>.Event<Map<TopicPartition, OffsetAndMetadata>> {
        private final CommittableBatch commitBatch;
        private final AtomicBoolean isPending;
        private final AtomicInteger inProgress;

        CommitEvent() {
            super(EventType.COMMIT);
            this.isPending = new AtomicBoolean();
            this.inProgress = new AtomicInteger();
            this.commitBatch = new CommittableBatch();
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:14:0x003b. Please report as an issue. */
        /* JADX WARN: Removed duplicated region for block: B:18:0x00c1  */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                r5 = this;
                r0 = r5
                java.util.concurrent.atomic.AtomicBoolean r0 = r0.isPending
                r1 = 1
                r2 = 0
                boolean r0 = r0.compareAndSet(r1, r2)
                if (r0 != 0) goto Ld
                return
            Ld:
                r0 = r5
                reactor.kafka.receiver.internals.CommittableBatch r0 = r0.commitBatch
                reactor.kafka.receiver.internals.CommittableBatch$CommitArgs r0 = r0.getAndClearOffsets()
                r6 = r0
                r0 = r6
                if (r0 == 0) goto Lcd
                r0 = r6
                java.util.Map r0 = r0.offsets()     // Catch: java.lang.Exception -> Ld0
                boolean r0 = r0.isEmpty()     // Catch: java.lang.Exception -> Ld0
                if (r0 != 0) goto Lc4
                r0 = r5
                java.util.concurrent.atomic.AtomicInteger r0 = r0.inProgress     // Catch: java.lang.Exception -> Ld0
                int r0 = r0.incrementAndGet()     // Catch: java.lang.Exception -> Ld0
                int[] r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.AnonymousClass1.$SwitchMap$reactor$kafka$receiver$internals$DefaultKafkaReceiver$AckMode     // Catch: java.lang.Exception -> Ld0
                r1 = r5
                reactor.kafka.receiver.internals.DefaultKafkaReceiver r1 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.this     // Catch: java.lang.Exception -> Ld0
                reactor.kafka.receiver.internals.DefaultKafkaReceiver$AckMode r1 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.access$1400(r1)     // Catch: java.lang.Exception -> Ld0
                int r1 = r1.ordinal()     // Catch: java.lang.Exception -> Ld0
                r0 = r0[r1]     // Catch: java.lang.Exception -> Ld0
                switch(r0) {
                    case 1: goto L54;
                    case 2: goto L90;
                    default: goto L93;
                }     // Catch: java.lang.Exception -> Ld0
            L54:
                r0 = r5
                reactor.kafka.receiver.internals.DefaultKafkaReceiver r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.this     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                org.apache.kafka.clients.consumer.Consumer r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.access$300(r0)     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                r1 = r6
                java.util.Map r1 = r1.offsets()     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                r0.commitSync(r1)     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                r0 = r5
                r1 = r6
                r2 = r6
                java.util.Map r2 = r2.offsets()     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                r0.handleSuccess(r1, r2)     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                r0 = r5
                reactor.kafka.receiver.internals.DefaultKafkaReceiver r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.this     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                reactor.kafka.receiver.internals.DefaultKafkaReceiver$AtmostOnceOffsets r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.access$1700(r0)     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                r1 = r6
                java.util.Map r1 = r1.offsets()     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                r0.onCommit(r1)     // Catch: java.lang.Exception -> L7e java.lang.Exception -> Ld0
                goto L85
            L7e:
                r7 = move-exception
                r0 = r5
                r1 = r6
                r2 = r7
                r0.handleFailure(r1, r2)     // Catch: java.lang.Exception -> Ld0
            L85:
                r0 = r5
                java.util.concurrent.atomic.AtomicInteger r0 = r0.inProgress     // Catch: java.lang.Exception -> Ld0
                int r0 = r0.decrementAndGet()     // Catch: java.lang.Exception -> Ld0
                goto Lb4
            L90:
                goto Lb4
            L93:
                r0 = r5
                reactor.kafka.receiver.internals.DefaultKafkaReceiver r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.this     // Catch: java.lang.Exception -> Ld0
                org.apache.kafka.clients.consumer.Consumer r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.access$300(r0)     // Catch: java.lang.Exception -> Ld0
                r1 = r6
                java.util.Map r1 = r1.offsets()     // Catch: java.lang.Exception -> Ld0
                r2 = r5
                r3 = r6
                void r2 = (v2, v3) -> { // org.apache.kafka.clients.consumer.OffsetCommitCallback.onComplete(java.util.Map, java.lang.Exception):void
                    r2.lambda$run$0(r3, v2, v3);
                }     // Catch: java.lang.Exception -> Ld0
                r0.commitAsync(r1, r2)     // Catch: java.lang.Exception -> Ld0
                r0 = r5
                reactor.kafka.receiver.internals.DefaultKafkaReceiver r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.this     // Catch: java.lang.Exception -> Ld0
                reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.access$1800(r0)     // Catch: java.lang.Exception -> Ld0
                r0.scheduleIfRequired()     // Catch: java.lang.Exception -> Ld0
            Lb4:
                r0 = r5
                reactor.kafka.receiver.internals.DefaultKafkaReceiver r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.this     // Catch: java.lang.Exception -> Ld0
                reactor.kafka.receiver.internals.DefaultKafkaReceiver$AckMode r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.access$1400(r0)     // Catch: java.lang.Exception -> Ld0
                reactor.kafka.receiver.internals.DefaultKafkaReceiver$AckMode r1 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.AckMode.ATMOST_ONCE     // Catch: java.lang.Exception -> Ld0
                if (r0 == r1) goto Lcd
                goto Lcd
            Lc4:
                r0 = r5
                r1 = r6
                r2 = r6
                java.util.Map r2 = r2.offsets()     // Catch: java.lang.Exception -> Ld0
                r0.handleSuccess(r1, r2)     // Catch: java.lang.Exception -> Ld0
            Lcd:
                goto Lea
            Ld0:
                r7 = move-exception
                org.slf4j.Logger r0 = reactor.kafka.receiver.internals.DefaultKafkaReceiver.access$700()
                java.lang.String r1 = "Unexpected exception"
                r2 = r7
                r0.error(r1, r2)
                r0 = r5
                java.util.concurrent.atomic.AtomicInteger r0 = r0.inProgress
                int r0 = r0.decrementAndGet()
                r0 = r5
                r1 = r6
                r2 = r7
                r0.handleFailure(r1, r2)
            Lea:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: reactor.kafka.receiver.internals.DefaultKafkaReceiver.CommitEvent.run():void");
        }

        void runIfRequired(boolean z) {
            if (z) {
                this.isPending.set(true);
            }
            if (this.isPending.get()) {
                run();
            }
        }

        private void handleSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, OffsetAndMetadata> map) {
            if (!map.isEmpty()) {
                DefaultKafkaReceiver.this.consecutiveCommitFailures.set(0);
            }
            if (commitArgs.callbackEmitters() != null) {
                Iterator<MonoSink<Void>> it = commitArgs.callbackEmitters().iterator();
                while (it.hasNext()) {
                    it.next().success();
                }
            }
        }

        private void handleFailure(CommittableBatch.CommitArgs commitArgs, Exception exc) {
            DefaultKafkaReceiver.log.warn("Commit failed", exc);
            if (isRetriableException(exc) && !DefaultKafkaReceiver.this.isClosed.get() && DefaultKafkaReceiver.this.consecutiveCommitFailures.incrementAndGet() < DefaultKafkaReceiver.this.receiverOptions.maxCommitAttempts()) {
                this.commitBatch.restoreOffsets(commitArgs, true);
                DefaultKafkaReceiver.log.warn("Commit failed with exception" + exc + ", retries remaining " + (DefaultKafkaReceiver.this.receiverOptions.maxCommitAttempts() - DefaultKafkaReceiver.this.consecutiveCommitFailures.get()));
                this.isPending.set(true);
                return;
            }
            List<MonoSink<Void>> callbackEmitters = commitArgs.callbackEmitters();
            if (callbackEmitters == null || callbackEmitters.isEmpty()) {
                DefaultKafkaReceiver.this.fail(exc, false);
                return;
            }
            this.isPending.set(false);
            this.commitBatch.restoreOffsets(commitArgs, false);
            Iterator<MonoSink<Void>> it = callbackEmitters.iterator();
            while (it.hasNext()) {
                it.next().error(exc);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public DefaultKafkaReceiver<K, V>.CommitEvent periodicEvent() {
            this.isPending.set(true);
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void scheduleIfRequired() {
            if (DefaultKafkaReceiver.this.isActive.get() && this.isPending.compareAndSet(false, true)) {
                DefaultKafkaReceiver.this.emit(this);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitFor(long j) {
            while (this.inProgress.get() > 0 && j - System.nanoTime() > 0) {
                DefaultKafkaReceiver.this.consumer.poll(1L);
            }
        }

        protected boolean isRetriableException(Exception exc) {
            return exc instanceof RetriableCommitFailedException;
        }
    }

    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$CommittableOffset.class */
    class CommittableOffset implements ReceiverOffset {
        private final TopicPartition topicPartition;
        private final long commitOffset;
        private final AtomicBoolean acknowledged;

        public CommittableOffset(DefaultKafkaReceiver defaultKafkaReceiver, ConsumerRecord<K, V> consumerRecord) {
            this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
        }

        public CommittableOffset(TopicPartition topicPartition, long j) {
            this.topicPartition = topicPartition;
            this.commitOffset = j;
            this.acknowledged = new AtomicBoolean(false);
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public Mono<Void> commit() {
            return maybeUpdateOffset() > 0 ? scheduleCommit() : Mono.empty();
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public void acknowledge() {
            int commitBatchSize = DefaultKafkaReceiver.this.receiverOptions.commitBatchSize();
            long maybeUpdateOffset = maybeUpdateOffset();
            if (commitBatchSize <= 0 || maybeUpdateOffset < commitBatchSize) {
                return;
            }
            scheduleIfRequired();
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public long offset() {
            return this.commitOffset;
        }

        private int maybeUpdateOffset() {
            return this.acknowledged.compareAndSet(false, true) ? DefaultKafkaReceiver.this.commitEvent.commitBatch.updateOffset(this.topicPartition, this.commitOffset) : DefaultKafkaReceiver.this.commitEvent.commitBatch.batchSize();
        }

        private Mono<Void> scheduleCommit() {
            return Mono.create(monoSink -> {
                DefaultKafkaReceiver.this.commitEvent.commitBatch.addCallbackEmitter(monoSink);
                scheduleIfRequired();
            });
        }

        private void scheduleIfRequired() {
            DefaultKafkaReceiver.this.commitEvent.scheduleIfRequired();
        }

        public String toString() {
            return this.topicPartition + "@" + this.commitOffset;
        }
    }

    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$CustomEvent.class */
    private class CustomEvent<T> extends DefaultKafkaReceiver<K, V>.Event<Void> {
        private final Function<Consumer<K, V>, ? extends T> function;
        private MonoSink<T> monoSink;

        CustomEvent(Function<Consumer<K, V>, ? extends T> function, MonoSink<T> monoSink) {
            super(EventType.CUSTOM);
            this.function = function;
            this.monoSink = monoSink;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (DefaultKafkaReceiver.this.isActive.get()) {
                try {
                    this.monoSink.success(this.function.apply(consumerProxy()));
                } catch (Throwable th) {
                    this.monoSink.error(th);
                }
            }
        }

        private Consumer<K, V> consumerProxy() {
            if (DefaultKafkaReceiver.this.consumerProxy == null) {
                InvocationHandler invocationHandler = (obj, method, objArr) -> {
                    if (!DefaultKafkaReceiver.DELEGATE_METHODS.contains(method.getName())) {
                        throw new UnsupportedOperationException("Method is not supported: " + method);
                    }
                    try {
                        return method.invoke(DefaultKafkaReceiver.this.consumer, objArr);
                    } catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                };
                DefaultKafkaReceiver.this.consumerProxy = (Consumer) Proxy.newProxyInstance(Consumer.class.getClassLoader(), new Class[]{Consumer.class}, invocationHandler);
            }
            return DefaultKafkaReceiver.this.consumerProxy;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$Event.class */
    public abstract class Event<R> implements Runnable {
        protected EventType eventType;

        Event(EventType eventType) {
            this.eventType = eventType;
        }

        public EventType eventType() {
            return this.eventType;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$EventType.class */
    public enum EventType {
        INIT,
        POLL,
        COMMIT,
        CUSTOM,
        CLOSE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$InitEvent.class */
    public class InitEvent extends DefaultKafkaReceiver<K, V>.Event<ConsumerRecords<K, V>> {
        private final java.util.function.Consumer<Flux<?>> kafkaSubscribeOrAssign;

        InitEvent(java.util.function.Consumer<Flux<?>> consumer) {
            super(EventType.INIT);
            this.kafkaSubscribeOrAssign = consumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                DefaultKafkaReceiver.this.isActive.set(true);
                DefaultKafkaReceiver.this.isClosed.set(false);
                DefaultKafkaReceiver.this.consumer = DefaultKafkaReceiver.this.consumerFactory.createConsumer(DefaultKafkaReceiver.this.receiverOptions);
                this.kafkaSubscribeOrAssign.accept(DefaultKafkaReceiver.this.consumerFlux);
            } catch (Exception e) {
                if (DefaultKafkaReceiver.this.isActive.get()) {
                    DefaultKafkaReceiver.log.error("Unexpected exception", e);
                    DefaultKafkaReceiver.this.fail(e, false);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver$PollEvent.class */
    public class PollEvent extends DefaultKafkaReceiver<K, V>.Event<ConsumerRecords<K, V>> {
        private AtomicInteger pendingCount;
        private final long pollTimeoutMs;
        private final AtomicBoolean partitionsPaused;

        PollEvent() {
            super(EventType.POLL);
            this.pendingCount = new AtomicInteger();
            this.partitionsPaused = new AtomicBoolean();
            this.pollTimeoutMs = DefaultKafkaReceiver.this.receiverOptions.pollTimeout().toMillis();
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultKafkaReceiver.this.needsHeartbeat.set(false);
            try {
                if (DefaultKafkaReceiver.this.isActive.get()) {
                    DefaultKafkaReceiver.this.commitEvent.runIfRequired(false);
                    this.pendingCount.decrementAndGet();
                    if (DefaultKafkaReceiver.this.requestsPending.get() <= 0 || DefaultKafkaReceiver.this.recordSubmission.requestedFromDownstream() <= 0 || DefaultKafkaReceiver.this.awaitingTransaction.get()) {
                        if (!this.partitionsPaused.getAndSet(true)) {
                            DefaultKafkaReceiver.this.consumer.pause(DefaultKafkaReceiver.this.consumer.assignment());
                        }
                    } else if (this.partitionsPaused.getAndSet(false)) {
                        DefaultKafkaReceiver.this.consumer.resume(DefaultKafkaReceiver.this.consumer.assignment());
                    }
                    ConsumerRecords poll = DefaultKafkaReceiver.this.consumer.poll(this.pollTimeoutMs);
                    if (poll.count() > 0) {
                        DefaultKafkaReceiver.this.recordSubmission.next(poll);
                    }
                    if (DefaultKafkaReceiver.this.isActive.get()) {
                        if (DefaultKafkaReceiver.this.requestsPending.addAndGet(0 - (((DefaultKafkaReceiver.this.ackMode == AckMode.AUTO_ACK || DefaultKafkaReceiver.this.ackMode == AckMode.EXACTLY_ONCE) && poll.count() > 0) ? 1 : poll.count())) > 0 || DefaultKafkaReceiver.this.commitEvent.inProgress.get() > 0) {
                            scheduleIfRequired();
                        }
                    }
                }
            } catch (Exception e) {
                if (DefaultKafkaReceiver.this.isActive.get()) {
                    DefaultKafkaReceiver.log.error("Unexpected exception", e);
                    DefaultKafkaReceiver.this.fail(e, false);
                }
            }
        }

        void scheduleIfRequired() {
            if (this.pendingCount.get() <= 0) {
                DefaultKafkaReceiver.this.emit(this);
                this.pendingCount.incrementAndGet();
            }
        }
    }

    public DefaultKafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions<K, V> receiverOptions) {
        this.consumerFactory = consumerFactory;
        this.receiverOptions = receiverOptions.toImmutable();
        this.eventScheduler = Schedulers.newSingle("reactive-kafka-" + receiverOptions.groupId());
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<ReceiverRecord<K, V>> receive() {
        this.ackMode = AckMode.MANUAL_ACK;
        return withDoOnRequest(createConsumerFlux().concatMap(consumerRecords -> {
            return Flux.fromIterable(consumerRecords);
        })).map(consumerRecord -> {
            return new ReceiverRecord(consumerRecord, new CommittableOffset(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()));
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
        this.ackMode = AckMode.AUTO_ACK;
        return withDoOnRequest(createConsumerFlux()).map(consumerRecords -> {
            return Flux.fromIterable(consumerRecords).doAfterTerminate(() -> {
                Iterator it = consumerRecords.iterator();
                while (it.hasNext()) {
                    new CommittableOffset(this, (ConsumerRecord) it.next()).acknowledge();
                }
            });
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
        this.ackMode = AckMode.ATMOST_ONCE;
        this.atmostOnceOffsets = new AtmostOnceOffsets();
        return withDoOnRequest(createConsumerFlux().concatMap(consumerRecords -> {
            return Flux.fromIterable(consumerRecords);
        })).doOnNext(consumerRecord -> {
            long offset = consumerRecord.offset();
            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            long committedOffset = this.atmostOnceOffsets.committedOffset(topicPartition);
            this.atmostOnceOffsets.onDispatch(topicPartition, offset);
            long atmostOnceCommitAheadSize = this.receiverOptions.atmostOnceCommitAheadSize();
            CommittableOffset committableOffset = new CommittableOffset(topicPartition, offset + atmostOnceCommitAheadSize);
            if (offset >= committedOffset) {
                committableOffset.commit().block();
            } else if (committedOffset - offset >= atmostOnceCommitAheadSize / 2) {
                committableOffset.commit().subscribe();
            }
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
        this.ackMode = AckMode.EXACTLY_ONCE;
        return withDoOnRequest(createConsumerFlux()).map(consumerRecords -> {
            return transactionManager.begin().then(Mono.fromCallable(() -> {
                return Boolean.valueOf(this.awaitingTransaction.getAndSet(true));
            })).thenMany(transactionalRecords(transactionManager, consumerRecords));
        }).publishOn(transactionManager.scheduler());
    }

    private Flux<ConsumerRecord<K, V>> transactionalRecords(TransactionManager transactionManager, ConsumerRecords<K, V> consumerRecords) {
        if (consumerRecords.isEmpty()) {
            return Flux.empty();
        }
        CommittableBatch committableBatch = new CommittableBatch();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            committableBatch.updateOffset(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
        }
        return Flux.fromIterable(consumerRecords).concatWith(transactionManager.sendOffsets(committableBatch.getAndClearOffsets().offsets(), this.receiverOptions.groupId())).doAfterTerminate(() -> {
            this.awaitingTransaction.set(false);
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return Mono.create(monoSink -> {
            emit(new CustomEvent(function, monoSink));
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        log.debug("onPartitionsAssigned {}", collection);
        if (collection.isEmpty()) {
            return;
        }
        Iterator<java.util.function.Consumer<Collection<ReceiverPartition>>> it = this.receiverOptions.assignListeners().iterator();
        while (it.hasNext()) {
            it.next().accept(toSeekable(collection));
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.debug("onPartitionsRevoked {}", collection);
        if (collection.isEmpty()) {
            return;
        }
        if (this.ackMode != AckMode.ATMOST_ONCE) {
            this.commitEvent.runIfRequired(true);
        }
        Iterator<java.util.function.Consumer<Collection<ReceiverPartition>>> it = this.receiverOptions.revokeListeners().iterator();
        while (it.hasNext()) {
            it.next().accept(toSeekable(collection));
        }
    }

    private Flux<ConsumerRecords<K, V>> createConsumerFlux() {
        if (this.consumerFlux != null) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
        }
        this.initEvent = new InitEvent(flux -> {
            this.receiverOptions.subscriber(this).accept(this.consumer);
        });
        this.pollEvent = new PollEvent();
        this.commitEvent = new CommitEvent();
        this.recordEmitter = EmitterProcessor.create();
        this.recordSubmission = this.recordEmitter.sink(FluxSink.OverflowStrategy.BUFFER);
        this.consumerFlux = this.recordEmitter.publishOn(Schedulers.parallel()).doOnSubscribe(subscription -> {
            try {
                start();
            } catch (Exception e) {
                log.error("Subscription to event flux failed", e);
                throw e;
            }
        }).doOnRequest(j -> {
            if (this.requestsPending.get() > 0) {
                this.pollEvent.scheduleIfRequired();
            }
        }).doOnCancel(() -> {
            cancel(true);
        });
        return this.consumerFlux;
    }

    private <T> Flux<T> withDoOnRequest(Flux<T> flux) {
        return flux.doOnRequest(j -> {
            if (this.requestsPending.addAndGet(j) > 0) {
                this.pollEvent.scheduleIfRequired();
            }
        });
    }

    Consumer<K, V> kafkaConsumer() {
        return this.consumer;
    }

    CommittableBatch committableBatch() {
        return ((CommitEvent) this.commitEvent).commitBatch;
    }

    void close() {
        cancel(true);
    }

    private Collection<ReceiverPartition> toSeekable(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(new SeekablePartition(this.consumer, it.next()));
        }
        return arrayList;
    }

    private void start() {
        log.debug("start");
        if (!this.isActive.compareAndSet(false, true)) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
        }
        this.fluxList.clear();
        this.requestsPending.set(0L);
        this.consecutiveCommitFailures.set(0);
        this.awaitingTransaction.set(false);
        this.eventEmitter = EmitterProcessor.create();
        this.eventSubmission = this.eventEmitter.sink(FluxSink.OverflowStrategy.BUFFER);
        this.eventScheduler.start();
        Flux<? extends DefaultKafkaReceiver<K, V>.Event<?>> just = Flux.just(this.initEvent);
        this.fluxList.add(this.eventEmitter);
        this.fluxList.add(just);
        Duration commitInterval = this.receiverOptions.commitInterval();
        if ((this.ackMode == AckMode.AUTO_ACK || this.ackMode == AckMode.MANUAL_ACK) && !commitInterval.isZero()) {
            this.fluxList.add(Flux.interval(this.receiverOptions.commitInterval()).map(l -> {
                return this.commitEvent.periodicEvent();
            }));
        }
        this.eventFlux = Flux.merge(this.fluxList).publishOn(this.eventScheduler);
        this.subscribeDisposables.add(this.eventFlux.subscribe(event -> {
            doEvent(event);
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fail(Throwable th, boolean z) {
        log.error("Consumer flux exception", th);
        this.recordSubmission.error(th);
        cancel(z);
    }

    /* JADX WARN: Incorrect condition in loop: B:114:0x020f */
    /* JADX WARN: Incorrect condition in loop: B:48:0x00fb */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void cancel(boolean r6) {
        /*
            Method dump skipped, instructions count: 870
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: reactor.kafka.receiver.internals.DefaultKafkaReceiver.cancel(boolean):void");
    }

    private void doEvent(DefaultKafkaReceiver<K, V>.Event<?> event) {
        log.trace("doEvent {}", event.eventType);
        try {
            event.run();
        } catch (Exception e) {
            fail(e, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void emit(DefaultKafkaReceiver<K, V>.Event<?> event) {
        this.eventSubmission.next(event);
    }
}
