package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
import org.springframework.context.ApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderUtils.class */
final class KafkaStreamsBinderUtils {
    private static final Log LOGGER = LogFactory.getLog(KafkaStreamsBinderUtils.class);

    private KafkaStreamsBinderUtils() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void prepareConsumerBinding(String str, String str2, ApplicationContext applicationContext, KafkaTopicProvisioner kafkaTopicProvisioner, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, ExtendedConsumerProperties<KafkaStreamsConsumerProperties> extendedConsumerProperties, RetryTemplate retryTemplate, ConfigurableListableBeanFactory configurableListableBeanFactory, String str3, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        if (kafkaStreamsBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
            ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).setEnableDlq(true);
        }
        if (((KafkaStreamsConsumerProperties) extendedConsumerProperties.getExtension()).getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
            ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).setEnableDlq(true);
        }
        String[] commaDelimitedListToStringArray = StringUtils.commaDelimitedListToStringArray(str);
        for (String str4 : commaDelimitedListToStringArray) {
            kafkaTopicProvisioner.provisionConsumerDestination(str4, str2, extendedConsumerProperties);
        }
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) {
            Map beansOfType = applicationContext.getBeansOfType(DlqPartitionFunction.class, false, false);
            DlqPartitionFunction determineFallbackFunction = beansOfType.size() == 1 ? (DlqPartitionFunction) beansOfType.values().iterator().next() : DlqPartitionFunction.determineFallbackFunction(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqPartitions(), LOGGER);
            DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(new ExtendedProducerProperties(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqProducerProperties()), kafkaStreamsBinderConfigurationProperties);
            kafkaStreamsBindingInformationCatalogue.addDlqProducerFactory(streamsBuilderFactoryBean, producerFactory);
            KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
            Map beansOfType2 = applicationContext.getBeansOfType(DlqDestinationResolver.class, false, false);
            DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = (beansOfType2.isEmpty() && StringUtils.isEmpty(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqName())) ? null : new DeadLetterPublishingRecoverer(kafkaTemplate, beansOfType2.isEmpty() ? (consumerRecord, exc) -> {
                return new TopicPartition(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqName(), determineFallbackFunction.apply(str2, consumerRecord, exc).intValue());
            } : (consumerRecord2, exc2) -> {
                return new TopicPartition((String) ((DlqDestinationResolver) beansOfType2.values().iterator().next()).apply(consumerRecord2, exc2), determineFallbackFunction.apply(str2, consumerRecord2, exc2).intValue());
            });
            for (String str5 : commaDelimitedListToStringArray) {
                if (StringUtils.isEmpty(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqName()) && beansOfType2.isEmpty()) {
                    deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, (consumerRecord3, exc3) -> {
                        return new TopicPartition("error." + str5 + "." + str2, determineFallbackFunction.apply(str2, consumerRecord3, exc3).intValue());
                    });
                }
                ((SendToDlqAndContinue) applicationContext.getBean(SendToDlqAndContinue.class)).addKStreamDlqDispatch(str5, deadLetterPublishingRecoverer);
            }
        }
        if (StringUtils.hasText(extendedConsumerProperties.getRetryTemplateName())) {
            return;
        }
        ((BeanDefinitionRegistry) configurableListableBeanFactory).registerBeanDefinition(str3 + "-RetryTemplate", BeanDefinitionBuilder.genericBeanDefinition(retryTemplate.getClass(), () -> {
            return retryTemplate;
        }).getRawBeanDefinition());
    }

    private static DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties, KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        HashMap hashMap = new HashMap();
        hashMap.put("retries", 0);
        hashMap.put("buffer.memory", 33554432);
        hashMap.put("acks", kafkaBinderConfigurationProperties.getRequiredAcks());
        Map mergedProducerConfiguration = kafkaBinderConfigurationProperties.mergedProducerConfiguration();
        if (!ObjectUtils.isEmpty(mergedProducerConfiguration)) {
            hashMap.putAll(mergedProducerConfiguration);
        }
        if (ObjectUtils.isEmpty(hashMap.get("bootstrap.servers"))) {
            hashMap.put("bootstrap.servers", kafkaBinderConfigurationProperties.getKafkaConnectionString());
        }
        if (ObjectUtils.isEmpty(hashMap.get("batch.size"))) {
            hashMap.put("batch.size", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBufferSize()));
        }
        if (ObjectUtils.isEmpty(hashMap.get("linger.ms"))) {
            hashMap.put("linger.ms", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBatchTimeout()));
        }
        if (ObjectUtils.isEmpty(hashMap.get("compression.type"))) {
            hashMap.put("compression.type", ((KafkaProducerProperties) extendedProducerProperties.getExtension()).getCompressionType().toString());
        }
        Map configuration = ((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration();
        Assert.state(!configuration.containsKey("bootstrap.servers"), "bootstrap.servers cannot be overridden at the binding level; use multiple binders instead");
        if (!ObjectUtils.isEmpty(configuration)) {
            hashMap.putAll(configuration);
        }
        hashMap.put("key.serializer", ByteArraySerializer.class);
        hashMap.put("value.serializer", ByteArraySerializer.class);
        return new DefaultKafkaProducerFactory<>(hashMap);
    }

    static boolean supportsKStream(MethodParameter methodParameter, Class<?> cls) {
        return KStream.class.isAssignableFrom(cls) && KStream.class.isAssignableFrom(methodParameter.getParameterType());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void closeDlqProducerFactories(KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        List<ProducerFactory<byte[], byte[]>> dlqProducerFactory = kafkaStreamsBindingInformationCatalogue.getDlqProducerFactory(streamsBuilderFactoryBean);
        if (CollectionUtils.isEmpty(dlqProducerFactory)) {
            return;
        }
        Iterator<ProducerFactory<byte[], byte[]>> it = dlqProducerFactory.iterator();
        while (it.hasNext()) {
            try {
                ((ProducerFactory) it.next()).destroy();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
    }
}
