package org.springframework.kafka.retrytopic;

import java.math.BigInteger;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.core.NestedRuntimeException;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopic;
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopicResolver;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.class */
public class DeadLetterPublishingRecovererFactory {
    private static final String NO_OPS_RETRY_TOPIC = "internal-kafka-noOpsRetry";
    private final DestinationTopicResolver destinationTopicResolver;
    private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = deadLetterPublishingRecoverer -> {
    };

    /* loaded from: input_file:org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory$Configuration.class */
    public static class Configuration {
        private final KafkaOperations<?, ?> template;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Configuration(KafkaOperations<?, ?> kafkaOperations) {
            Assert.notNull(kafkaOperations, () -> {
                return "You need to provide a KafkaOperations instance.";
            });
            this.template = kafkaOperations;
        }
    }

    public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
        this.destinationTopicResolver = destinationTopicResolver;
    }

    public DeadLetterPublishingRecoverer create(Configuration configuration) {
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(configuration.template, (consumerRecord, exc) -> {
            return resolveDestination(consumerRecord, exc);
        }) { // from class: org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.springframework.kafka.listener.DeadLetterPublishingRecoverer
            public void publish(ProducerRecord<Object, Object> producerRecord, KafkaOperations<Object, Object> kafkaOperations) {
                if (DeadLetterPublishingRecovererFactory.NO_OPS_RETRY_TOPIC.equals(producerRecord.topic())) {
                    this.logger.warn(() -> {
                        return "Processing failed for last topic, giving up.";
                    });
                } else {
                    super.publish(producerRecord, DeadLetterPublishingRecovererFactory.this.destinationTopicResolver.getCurrentTopic(producerRecord.topic()).getKafkaOperations());
                }
            }
        };
        deadLetterPublishingRecoverer.setHeadersFunction((consumerRecord2, exc2) -> {
            return addHeaders(consumerRecord2, exc2, getAttempts(consumerRecord2));
        });
        this.recovererCustomizer.accept(deadLetterPublishingRecoverer);
        return deadLetterPublishingRecoverer;
    }

    public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> consumer) {
        this.recovererCustomizer = consumer;
    }

    private TopicPartition resolveDestination(ConsumerRecord<?, ?> consumerRecord, Exception exc) {
        if (isBackoffException(exc)) {
            throw ((NestedRuntimeException) exc);
        }
        DestinationTopic resolveNextDestination = this.destinationTopicResolver.resolveNextDestination(consumerRecord.topic(), Integer.valueOf(getAttempts(consumerRecord)), exc, new BigInteger(getOriginalTimestampHeader(consumerRecord)).longValue());
        return resolveNextDestination.isNoOpsTopic() ? new TopicPartition(NO_OPS_RETRY_TOPIC, 0) : new TopicPartition(resolveNextDestination.getDestinationName(), consumerRecord.partition() % resolveNextDestination.getDestinationPartitions().intValue());
    }

    private boolean isBackoffException(Exception exc) {
        return NestedRuntimeException.class.isAssignableFrom(exc.getClass()) && ((NestedRuntimeException) exc).contains(KafkaBackoffException.class);
    }

    private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
        Header lastHeader = consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
        if (lastHeader != null) {
            return lastHeader.value()[0];
        }
        return 1;
    }

    private Headers addHeaders(ConsumerRecord<?, ?> consumerRecord, Exception exc, int i) {
        RecordHeaders recordHeaders = new RecordHeaders();
        byte[] originalTimestampHeader = getOriginalTimestampHeader(consumerRecord);
        recordHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampHeader);
        recordHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, BigInteger.valueOf(i + 1).toByteArray());
        recordHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP, BigInteger.valueOf(this.destinationTopicResolver.resolveDestinationNextExecutionTimestamp(consumerRecord.topic(), Integer.valueOf(i), exc, new BigInteger(originalTimestampHeader).longValue())).toByteArray());
        return recordHeaders;
    }

    private byte[] getOriginalTimestampHeader(ConsumerRecord<?, ?> consumerRecord) {
        Header lastHeader = consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
        return lastHeader != null ? lastHeader.value() : BigInteger.valueOf(consumerRecord.timestamp()).toByteArray();
    }
}
