package reactor.kafka.receiver.internals;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerHandler.class */
public class ConsumerHandler<K, V> {
    private static final Set<String> DELEGATE_METHODS = new HashSet(Arrays.asList("assignment", "subscription", "seek", "seekToBeginning", "seekToEnd", "position", "committed", "metrics", "partitionsFor", "listTopics", "paused", "pause", "resume", "offsetsForTimes", "beginningOffsets", "endOffsets"));
    private final ReceiverOptions<K, V> receiverOptions;
    private final Consumer<K, V> consumer;
    private final Scheduler eventScheduler;
    private final ConsumerEventLoop<K, V> consumerEventLoop;
    private Consumer<K, V> consumerProxy;
    final AtomicBoolean awaitingTransaction = new AtomicBoolean();
    private final AtmostOnceOffsets atmostOnceOffsets = new AtmostOnceOffsets();
    private final EmitterProcessor<ConsumerRecords<K, V>> processor = EmitterProcessor.create(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerHandler$CommittableOffset.class */
    public static class CommittableOffset<K, V> implements ReceiverOffset {
        private final TopicPartition topicPartition;
        private final long commitOffset;
        private final ConsumerEventLoop<K, V>.CommitEvent commitEvent;
        private final int commitBatchSize;
        private final AtomicBoolean acknowledged = new AtomicBoolean(false);

        public CommittableOffset(TopicPartition topicPartition, long j, ConsumerEventLoop<K, V>.CommitEvent commitEvent, int i) {
            this.topicPartition = topicPartition;
            this.commitOffset = j;
            this.commitEvent = commitEvent;
            this.commitBatchSize = i;
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public Mono<Void> commit() {
            return maybeUpdateOffset() > 0 ? scheduleCommit() : Mono.empty();
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public void acknowledge() {
            long maybeUpdateOffset = maybeUpdateOffset();
            if (this.commitBatchSize <= 0 || maybeUpdateOffset < this.commitBatchSize) {
                return;
            }
            this.commitEvent.scheduleIfRequired();
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public long offset() {
            return this.commitOffset;
        }

        private int maybeUpdateOffset() {
            return this.acknowledged.compareAndSet(false, true) ? this.commitEvent.commitBatch.updateOffset(this.topicPartition, this.commitOffset) : this.commitEvent.commitBatch.batchSize();
        }

        private Mono<Void> scheduleCommit() {
            return Mono.create(monoSink -> {
                this.commitEvent.commitBatch.addCallbackEmitter(monoSink);
                this.commitEvent.scheduleIfRequired();
            });
        }

        public String toString() {
            return this.topicPartition + "@" + this.commitOffset;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerHandler(ReceiverOptions<K, V> receiverOptions, Consumer<K, V> consumer, Predicate<Throwable> predicate, AckMode ackMode) {
        this.receiverOptions = receiverOptions;
        this.consumer = consumer;
        this.eventScheduler = KafkaSchedulers.newEvent(receiverOptions.groupId());
        this.consumerEventLoop = new ConsumerEventLoop<>(ackMode, this.atmostOnceOffsets, receiverOptions, this.eventScheduler, consumer, predicate, this.processor.sink(FluxSink.OverflowStrategy.ERROR), this.awaitingTransaction);
        this.eventScheduler.start();
    }

    public Flux<ConsumerRecords<K, V>> receive() {
        return this.processor;
    }

    public Mono<Void> close() {
        return this.consumerEventLoop.stop().doFinally(signalType -> {
            this.eventScheduler.dispose();
        });
    }

    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return Mono.create(monoSink -> {
            monoSink.onCancel(this.eventScheduler.schedule(() -> {
                try {
                    monoSink.success(function.apply(consumerProxy()));
                } catch (Exception e) {
                    monoSink.error(e);
                }
            }));
        });
    }

    public Mono<Void> commit(ConsumerRecord<K, V> consumerRecord) {
        long offset = consumerRecord.offset();
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        long committedOffset = this.atmostOnceOffsets.committedOffset(topicPartition);
        this.atmostOnceOffsets.onDispatch(topicPartition, offset);
        long atmostOnceCommitAheadSize = this.receiverOptions.atmostOnceCommitAheadSize();
        CommittableOffset committableOffset = new CommittableOffset(topicPartition, offset + atmostOnceCommitAheadSize, this.consumerEventLoop.commitEvent, this.receiverOptions.commitBatchSize());
        if (offset >= committedOffset) {
            return committableOffset.commit();
        }
        if (committedOffset - offset >= atmostOnceCommitAheadSize / 2) {
            committableOffset.commit().subscribe();
        }
        return Mono.empty();
    }

    public void acknowledge(ConsumerRecord<K, V> consumerRecord) {
        toCommittableOffset(consumerRecord).acknowledge();
    }

    public CommittableOffset<K, V> toCommittableOffset(ConsumerRecord<K, V> consumerRecord) {
        return new CommittableOffset<>(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), this.consumerEventLoop.commitEvent, this.receiverOptions.commitBatchSize());
    }

    private Consumer<K, V> consumerProxy() {
        if (this.consumerProxy != null) {
            return this.consumerProxy;
        }
        this.consumerProxy = (Consumer) Proxy.newProxyInstance(Consumer.class.getClassLoader(), new Class[]{Consumer.class}, (obj, method, objArr) -> {
            if (!DELEGATE_METHODS.contains(method.getName())) {
                throw new UnsupportedOperationException("Method is not supported: " + method);
            }
            try {
                return method.invoke(this.consumer, objArr);
            } catch (InvocationTargetException e) {
                throw e.getCause();
            }
        });
        return this.consumerProxy;
    }
}
