package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/* loaded from: input_file:org/springframework/kafka/listener/FallbackBatchErrorHandler.class */
class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErrorHandler {
    private final LogAccessor logger;
    private final BackOff backOff;
    private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
    private final CommonErrorHandler seeker;
    private final ThreadLocal<Boolean> retrying;
    private final List<RetryListener> retryListeners;
    private boolean ackAfterHandle;

    /* loaded from: input_file:org/springframework/kafka/listener/FallbackBatchErrorHandler$SeekAfterRecoverFailsOrInterrupted.class */
    private final class SeekAfterRecoverFailsOrInterrupted implements CommonErrorHandler {
        SeekAfterRecoverFailsOrInterrupted() {
        }

        @Override // org.springframework.kafka.listener.CommonErrorHandler
        public void handleBatch(Exception exc, ConsumerRecords<?, ?> consumerRecords, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer, Runnable runnable) {
            LinkedHashMap linkedHashMap = (LinkedHashMap) consumerRecords.partitions().stream().collect(Collectors.toMap(topicPartition -> {
                return topicPartition;
            }, topicPartition2 -> {
                return Long.valueOf(((ConsumerRecord) consumerRecords.records(topicPartition2).get(0)).offset());
            }, (l, l2) -> {
                return Long.valueOf(l2.longValue());
            }, LinkedHashMap::new));
            Objects.requireNonNull(consumer);
            linkedHashMap.forEach((v1, v2) -> {
                r1.seek(v1, v2);
            });
            throw new KafkaException("Seek to current after exception", FallbackBatchErrorHandler.this.getLogLevel(), exc);
        }
    }

    FallbackBatchErrorHandler() {
        this(new FixedBackOff(), null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FallbackBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer consumerRecordRecoverer) {
        this.logger = new LogAccessor(LogFactory.getLog(getClass()));
        this.seeker = new SeekAfterRecoverFailsOrInterrupted();
        this.retrying = ThreadLocal.withInitial(() -> {
            return false;
        });
        this.retryListeners = new ArrayList();
        this.ackAfterHandle = true;
        this.backOff = backOff;
        this.recoverer = (consumerRecords, exc) -> {
            if (consumerRecordRecoverer == null) {
                this.logger.error(exc, () -> {
                    return "Records discarded: " + ErrorHandlingUtils.recordsToString(consumerRecords);
                });
            } else {
                consumerRecords.spliterator().forEachRemaining(consumerRecord -> {
                    consumerRecordRecoverer.accept(consumerRecord, exc);
                });
            }
        };
    }

    public void setRetryListeners(RetryListener... retryListenerArr) {
        Assert.noNullElements(retryListenerArr, "'listeners' cannot have null elements");
        this.retryListeners.clear();
        this.retryListeners.addAll(Arrays.asList(retryListenerArr));
    }

    @Override // org.springframework.kafka.listener.CommonErrorHandler
    public boolean isAckAfterHandle() {
        return this.ackAfterHandle;
    }

    @Override // org.springframework.kafka.listener.CommonErrorHandler
    public void setAckAfterHandle(boolean z) {
        this.ackAfterHandle = z;
    }

    @Override // org.springframework.kafka.listener.CommonErrorHandler
    public void handleBatch(Exception exc, @Nullable ConsumerRecords<?, ?> consumerRecords, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer, Runnable runnable) {
        if (consumerRecords == null || consumerRecords.count() == 0) {
            this.logger.error(exc, "Called with no records; consumer exception");
            return;
        }
        this.retrying.set(true);
        try {
            ErrorHandlingUtils.retryBatch(exc, consumerRecords, consumer, messageListenerContainer, runnable, this.backOff, this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getClassifier());
            this.retrying.set(false);
        } catch (Throwable th) {
            this.retrying.set(false);
            throw th;
        }
    }

    @Override // org.springframework.kafka.listener.CommonErrorHandler
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> collection, Runnable runnable) {
        if (this.retrying.get().booleanValue()) {
            consumer.pause(consumer.assignment());
            runnable.run();
        }
    }
}
