package org.springframework.kafka.retrytopic;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopic;
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopicProcessor;
import org.springframework.kafka.support.Suffixer;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:org/springframework/kafka/retrytopic/RetryTopicConfigurer.class */
public class RetryTopicConfigurer {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurer.class));
    public static final EndpointHandlerMethod DEFAULT_DLT_HANDLER = createHandlerMethodWith((Class<?>) LoggingDltListenerHandlerMethod.class, LoggingDltListenerHandlerMethod.DEFAULT_DLT_METHOD_NAME);
    private final DestinationTopicProcessor destinationTopicProcessor;
    private final ListenerContainerFactoryResolver containerFactoryResolver;
    private final ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer;
    private final BeanFactory beanFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/kafka/retrytopic/RetryTopicConfigurer$EndpointCustomizerFunctionFactory.class */
    public static final class EndpointCustomizerFunctionFactory {
        private final DestinationTopic.Properties destinationProperties;
        private final EndpointHandlerMethod beanMethod;
        private final BeanFactory beanFactory;
        private final Function<EndpointCustomizingContext, EndpointCustomizingContext> endpointCustomizingContextListener;

        EndpointCustomizerFunctionFactory(DestinationTopic.Properties properties, EndpointHandlerMethod endpointHandlerMethod, BeanFactory beanFactory, Function<EndpointCustomizingContext, EndpointCustomizingContext> function) {
            this.destinationProperties = properties;
            this.beanMethod = endpointHandlerMethod;
            this.beanFactory = beanFactory;
            this.endpointCustomizingContextListener = function;
        }

        public Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer() {
            return addSuffixesAndRegisterDestination(this.destinationProperties.suffix()).andThen(setBeanAndMethod(this.beanMethod.resolveBean(this.beanFactory), this.beanMethod.getMethod()));
        }

        private Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> setBeanAndMethod(Object obj, Method method) {
            return methodKafkaListenerEndpoint -> {
                methodKafkaListenerEndpoint.setBean(obj);
                methodKafkaListenerEndpoint.setMethod(method);
                return methodKafkaListenerEndpoint;
            };
        }

        private Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> addSuffixesAndRegisterDestination(String str) {
            Suffixer suffixer = new Suffixer(str);
            return methodKafkaListenerEndpoint -> {
                methodKafkaListenerEndpoint.setId(suffixer.maybeAddTo(methodKafkaListenerEndpoint.getId()));
                methodKafkaListenerEndpoint.setGroupId(suffixer.maybeAddTo(methodKafkaListenerEndpoint.getGroupId()));
                methodKafkaListenerEndpoint.setTopics((String[]) customizeAndRegisterTopics(suffixer, methodKafkaListenerEndpoint).toArray(new String[0]));
                methodKafkaListenerEndpoint.setClientIdPrefix(suffixer.maybeAddTo(methodKafkaListenerEndpoint.getClientIdPrefix()));
                methodKafkaListenerEndpoint.setGroup(suffixer.maybeAddTo(methodKafkaListenerEndpoint.getGroup()));
                return methodKafkaListenerEndpoint;
            };
        }

        private Collection<String> customizeAndRegisterTopics(Suffixer suffixer, MethodKafkaListenerEndpoint<?, ?> methodKafkaListenerEndpoint) {
            return (Collection) getTopics(methodKafkaListenerEndpoint).stream().map(str -> {
                return new EndpointCustomizingContext(str, suffixer.maybeAddTo(str));
            }).map(this.endpointCustomizingContextListener).map((v0) -> {
                return v0.getProcessedTopic();
            }).collect(Collectors.toList());
        }

        private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> methodKafkaListenerEndpoint) {
            Collection<String> topics = methodKafkaListenerEndpoint.getTopics();
            if (topics == null || topics.isEmpty()) {
                topics = (Collection) Arrays.stream(methodKafkaListenerEndpoint.getTopicPartitionsToAssign()).map((v0) -> {
                    return v0.getTopic();
                }).collect(Collectors.toList());
            }
            if (topics == null || topics.isEmpty()) {
                throw new IllegalStateException("No topics where provided for RetryTopicConfiguration.");
            }
            return topics;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/retrytopic/RetryTopicConfigurer$EndpointCustomizingContext.class */
    public static final class EndpointCustomizingContext {
        private final String topic;
        private final String processedTopic;

        EndpointCustomizingContext(String str, String str2) {
            this.topic = str;
            this.processedTopic = str2;
        }

        String getTopic() {
            return this.topic;
        }

        String getProcessedTopic() {
            return this.processedTopic;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/kafka/retrytopic/RetryTopicConfigurer$EndpointHandlerMethod.class */
    public static class EndpointHandlerMethod {
        private final Class<?> beanClass;
        private final Method method;
        private Object bean;

        EndpointHandlerMethod(Class<?> cls, String str) {
            Assert.notNull(cls, () -> {
                return "No destination bean class provided!";
            });
            Assert.notNull(str, () -> {
                return "No method name for destination bean class provided!";
            });
            this.method = (Method) Arrays.stream(ReflectionUtils.getDeclaredMethods(cls)).filter(method -> {
                return method.getName().equals(str);
            }).findFirst().orElseThrow(() -> {
                return new IllegalArgumentException(String.format("No method %s in class %s", str, cls));
            });
            this.beanClass = cls;
        }

        EndpointHandlerMethod(Object obj, Method method) {
            Assert.notNull(obj, () -> {
                return "No bean for destination provided!";
            });
            Assert.notNull(method, () -> {
                return "No method for destination bean class provided!";
            });
            this.method = method;
            this.bean = obj;
            this.beanClass = obj.getClass();
        }

        public Object resolveBean(BeanFactory beanFactory) {
            if (this.bean == null) {
                try {
                    this.bean = beanFactory.getBean(this.beanClass);
                } catch (NoSuchBeanDefinitionException e) {
                    String str = this.beanClass.getSimpleName() + "-handlerMethod";
                    ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(str, new RootBeanDefinition(this.beanClass));
                    this.bean = beanFactory.getBean(str);
                }
            }
            return this.bean;
        }

        public Method getMethod() {
            return this.method;
        }
    }

    /* loaded from: input_file:org/springframework/kafka/retrytopic/RetryTopicConfigurer$EndpointProcessingCustomizerHolder.class */
    public static final class EndpointProcessingCustomizerHolder {
        private final Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> endpointCustomizer;
        private final Function<KafkaListenerContainerFactory<?>, KafkaListenerContainerFactory<?>> factoryCustomizer;

        public EndpointProcessingCustomizerHolder(Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> function, Function<KafkaListenerContainerFactory<?>, KafkaListenerContainerFactory<?>> function2) {
            this.endpointCustomizer = function;
            this.factoryCustomizer = function2;
        }

        public Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> getEndpointCustomizer() {
            return this.endpointCustomizer;
        }

        public Function<KafkaListenerContainerFactory<?>, KafkaListenerContainerFactory<?>> getFactoryCustomizer() {
            return this.factoryCustomizer;
        }
    }

    /* loaded from: input_file:org/springframework/kafka/retrytopic/RetryTopicConfigurer$EndpointProcessor.class */
    public interface EndpointProcessor extends BiConsumer<MethodKafkaListenerEndpoint<?, ?>, EndpointProcessingCustomizerHolder> {
    }

    /* loaded from: input_file:org/springframework/kafka/retrytopic/RetryTopicConfigurer$LoggingDltListenerHandlerMethod.class */
    static class LoggingDltListenerHandlerMethod {
        public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";

        LoggingDltListenerHandlerMethod() {
        }

        public void logMessage(Object obj) {
            if (obj instanceof ConsumerRecord) {
                RetryTopicConfigurer.LOGGER.info(() -> {
                    return "Received message in dlt listener: " + ListenerUtils.recordToString((ConsumerRecord) obj);
                });
            } else {
                RetryTopicConfigurer.LOGGER.info(() -> {
                    return "Received message in dlt listener.";
                });
            }
        }
    }

    RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver listenerContainerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, BeanFactory beanFactory) {
        this.destinationTopicProcessor = destinationTopicProcessor;
        this.containerFactoryResolver = listenerContainerFactoryResolver;
        this.listenerContainerFactoryConfigurer = listenerContainerFactoryConfigurer;
        this.beanFactory = beanFactory;
    }

    public void processMainAndRetryListeners(EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?, ?> methodKafkaListenerEndpoint, RetryTopicConfiguration retryTopicConfiguration) {
        throwIfMultiMethodEndpoint(methodKafkaListenerEndpoint);
        DestinationTopicProcessor.Context context = new DestinationTopicProcessor.Context(retryTopicConfiguration.getDestinationTopicProperties());
        configureMainEndpoint(methodKafkaListenerEndpoint, endpointProcessor, retryTopicConfiguration);
        configureRetryAndDltEndpoints(methodKafkaListenerEndpoint, endpointProcessor, context, retryTopicConfiguration);
        this.destinationTopicProcessor.processRegisteredDestinations(collection -> {
            createNewTopicBeans(collection, retryTopicConfiguration.forKafkaTopicAutoCreation());
        }, context);
    }

    private void configureMainEndpoint(MethodKafkaListenerEndpoint<?, ?> methodKafkaListenerEndpoint, EndpointProcessor endpointProcessor, RetryTopicConfiguration retryTopicConfiguration) {
        endpointProcessor.accept(methodKafkaListenerEndpoint, new EndpointProcessingCustomizerHolder(methodKafkaListenerEndpoint2 -> {
            return methodKafkaListenerEndpoint2;
        }, kafkaListenerContainerFactory -> {
            return resolveAndConfigureFactoryForMainEndpoint(kafkaListenerContainerFactory, retryTopicConfiguration);
        }));
    }

    private void configureRetryAndDltEndpoints(MethodKafkaListenerEndpoint<?, ?> methodKafkaListenerEndpoint, EndpointProcessor endpointProcessor, DestinationTopicProcessor.Context context, RetryTopicConfiguration retryTopicConfiguration) {
        EndpointHandlerMethod endpointHandlerMethod = new EndpointHandlerMethod(methodKafkaListenerEndpoint.getBean(), methodKafkaListenerEndpoint.getMethod());
        EndpointHandlerMethod dltHandlerMethod = retryTopicConfiguration.getDltHandlerMethod();
        this.destinationTopicProcessor.processDestinationProperties(properties -> {
            endpointProcessor.accept(new MethodKafkaListenerEndpoint(), new EndpointProcessingCustomizerHolder(getEndpointCustomizerFunction(endpointHandlerMethod, dltHandlerMethod, properties, context), kafkaListenerContainerFactory -> {
                return resolveAndConfigureFactoryForRetryEndpoint(kafkaListenerContainerFactory, retryTopicConfiguration);
            }));
        }, context);
    }

    private void createNewTopicBeans(Collection<String> collection, RetryTopicConfiguration.TopicCreation topicCreation) {
        if (topicCreation.shouldCreateTopics()) {
            collection.forEach(str -> {
                this.beanFactory.registerSingleton(str + "-topicRegistrationBean", new NewTopic(str, topicCreation.getNumPartitions(), topicCreation.getReplicationFactor()));
            });
        }
    }

    private Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> getEndpointCustomizerFunction(EndpointHandlerMethod endpointHandlerMethod, EndpointHandlerMethod endpointHandlerMethod2, DestinationTopic.Properties properties, DestinationTopicProcessor.Context context) {
        return new EndpointCustomizerFunctionFactory(properties, getEndpointBeanMethod(properties, endpointHandlerMethod, endpointHandlerMethod2), this.beanFactory, endpointCustomizingContext -> {
            return createAndRegisterDestinationTopic(properties, endpointCustomizingContext, context);
        }).createEndpointCustomizer();
    }

    private EndpointCustomizingContext createAndRegisterDestinationTopic(DestinationTopic.Properties properties, EndpointCustomizingContext endpointCustomizingContext, DestinationTopicProcessor.Context context) {
        this.destinationTopicProcessor.registerDestinationTopic(endpointCustomizingContext.getTopic(), endpointCustomizingContext.getProcessedTopic(), properties, context);
        return endpointCustomizingContext;
    }

    private EndpointHandlerMethod getEndpointBeanMethod(DestinationTopic.Properties properties, EndpointHandlerMethod endpointHandlerMethod, EndpointHandlerMethod endpointHandlerMethod2) {
        return properties.isDltTopic() ? getDltEndpointHandlerMethod(endpointHandlerMethod2) : endpointHandlerMethod;
    }

    private EndpointHandlerMethod getDltEndpointHandlerMethod(EndpointHandlerMethod endpointHandlerMethod) {
        return endpointHandlerMethod != null ? endpointHandlerMethod : DEFAULT_DLT_HANDLER;
    }

    private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForMainEndpoint(KafkaListenerContainerFactory<?> kafkaListenerContainerFactory, RetryTopicConfiguration retryTopicConfiguration) {
        return this.listenerContainerFactoryConfigurer.configure(this.containerFactoryResolver.resolveFactoryForMainEndpoint(kafkaListenerContainerFactory, retryTopicConfiguration.forContainerFactoryResolver()), retryTopicConfiguration.forDeadLetterFactory());
    }

    private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForRetryEndpoint(KafkaListenerContainerFactory<?> kafkaListenerContainerFactory, RetryTopicConfiguration retryTopicConfiguration) {
        return this.listenerContainerFactoryConfigurer.configure(this.containerFactoryResolver.resolveFactoryForRetryEndpoint(kafkaListenerContainerFactory, retryTopicConfiguration.forContainerFactoryResolver()), retryTopicConfiguration.forDeadLetterFactory());
    }

    private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> methodKafkaListenerEndpoint) {
        if (methodKafkaListenerEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
            throw new IllegalArgumentException("Retry Topic is not compatible with " + MultiMethodKafkaListenerEndpoint.class);
        }
    }

    public static EndpointHandlerMethod createHandlerMethodWith(Class<?> cls, String str) {
        return new EndpointHandlerMethod(cls, str);
    }

    public static EndpointHandlerMethod createHandlerMethodWith(Object obj, Method method) {
        return new EndpointHandlerMethod(obj, method);
    }
}
