package org.springframework.kafka.listener;

import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;

/* loaded from: input_file:org/springframework/kafka/listener/KafkaConsumerBackoffManager.class */
public class KafkaConsumerBackoffManager implements ApplicationListener<ListenerContainerPartitionIdleEvent> {
    public static final String INTERNAL_BACKOFF_CLOCK_BEAN_NAME = "internalBackOffClock";
    private final KafkaListenerEndpointRegistry registry;
    private final Map<TopicPartition, Context> backOffTimes = new HashMap();
    private final Clock clock;

    /* loaded from: input_file:org/springframework/kafka/listener/KafkaConsumerBackoffManager$Context.class */
    public static class Context {
        final long dueTimestamp;
        final String listenerId;
        final TopicPartition topicPartition;

        Context(long j, String str, TopicPartition topicPartition) {
            this.dueTimestamp = j;
            this.listenerId = str;
            this.topicPartition = topicPartition;
        }
    }

    public KafkaConsumerBackoffManager(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, @Qualifier("internalBackOffClock") Clock clock) {
        this.registry = kafkaListenerEndpointRegistry;
        this.clock = clock;
    }

    public void maybeBackoff(Context context) {
        long between = ChronoUnit.MILLIS.between(Instant.now(this.clock), Instant.ofEpochMilli(context.dueTimestamp));
        if (between > 0) {
            pauseConsumptionAndThrow(context, Long.valueOf(between));
        }
    }

    private void pauseConsumptionAndThrow(Context context, Long l) throws KafkaBackoffException {
        TopicPartition topicPartition = context.topicPartition;
        getListenerContainerFromContext(context).pausePartition(topicPartition);
        addBackoff(context, topicPartition);
        throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, backing off for approx. %s millis.", Integer.valueOf(context.topicPartition.partition()), context.topicPartition.topic(), l), topicPartition, context.listenerId, context.dueTimestamp);
    }

    private MessageListenerContainer getListenerContainerFromContext(Context context) {
        return this.registry.getListenerContainer(context.listenerId);
    }

    public void onApplicationEvent(ListenerContainerPartitionIdleEvent listenerContainerPartitionIdleEvent) {
        Context backoff = getBackoff(listenerContainerPartitionIdleEvent.getTopicPartition());
        if (backoff == null || Instant.now(this.clock).isBefore(Instant.ofEpochMilli(backoff.dueTimestamp))) {
            return;
        }
        getListenerContainerFromContext(backoff).resumePartition(backoff.topicPartition);
        removeBackoff(backoff.topicPartition);
    }

    protected void addBackoff(Context context, TopicPartition topicPartition) {
        synchronized (this.backOffTimes) {
            this.backOffTimes.put(topicPartition, context);
        }
    }

    protected Context getBackoff(TopicPartition topicPartition) {
        Context context;
        synchronized (this.backOffTimes) {
            context = this.backOffTimes.get(topicPartition);
        }
        return context;
    }

    protected void removeBackoff(TopicPartition topicPartition) {
        synchronized (this.backOffTimes) {
            this.backOffTimes.remove(topicPartition);
        }
    }

    public Context createContext(long j, String str, TopicPartition topicPartition) {
        return new Context(j, str, topicPartition);
    }
}
