package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.core.FluxedFunction;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.class */
public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionProcessor.class);
    private final BindingServiceProperties bindingServiceProperties;
    private final Map<String, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap();
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
    private final CleanupConfig cleanupConfig;
    private final FunctionCatalog functionCatalog;
    private final BindableProxyFactory bindableProxyFactory;
    private ConfigurableApplicationContext applicationContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor$2, reason: invalid class name */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset = new int[KafkaConsumerProperties.StartOffset.values().length];

        static {
            try {
                $SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset[KafkaConsumerProperties.StartOffset.earliest.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset[KafkaConsumerProperties.StartOffset.latest.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, CleanupConfig cleanupConfig, FunctionCatalog functionCatalog, BindableProxyFactory bindableProxyFactory) {
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
        this.cleanupConfig = cleanupConfig;
        this.functionCatalog = functionCatalog;
        this.bindableProxyFactory = bindableProxyFactory;
    }

    private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType) {
        TreeSet treeSet = new TreeSet(this.bindableProxyFactory.getInputs());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Iterator it = treeSet.iterator();
        if (it.hasNext()) {
            linkedHashMap.put(it.next(), resolvableType.getGeneric(new int[]{0}));
            ResolvableType generic = resolvableType.getGeneric(new int[]{1});
            while (true) {
                ResolvableType resolvableType2 = generic;
                if (!it.hasNext() || resolvableType2 == null) {
                    break;
                }
                if (resolvableType2.getRawClass() != null && (resolvableType2.getRawClass().equals(Function.class) || resolvableType2.getRawClass().equals(Consumer.class))) {
                    linkedHashMap.put(it.next(), resolvableType2.getGeneric(new int[]{0}));
                }
                generic = resolvableType2.getGeneric(new int[]{1});
            }
        }
        return linkedHashMap;
    }

    public void orchestrateStreamListenerSetupMethod(ResolvableType resolvableType, String str) {
        TreeSet treeSet = new TreeSet(this.bindableProxyFactory.getOutputs());
        String[] strArr = new String[treeSet.size()];
        int i = 0;
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            strArr[i2] = (String) it.next();
        }
        Object[] adaptAndRetrieveInboundArguments = adaptAndRetrieveInboundArguments(buildTypeMap(resolvableType), "foobar");
        try {
            if (resolvableType.getRawClass() == null || !resolvableType.getRawClass().equals(Consumer.class)) {
                FluxedFunction fluxedFunction = (Function) this.functionCatalog.lookup(Function.class, str);
                Object apply = ((Function) (fluxedFunction instanceof FluxedFunction ? fluxedFunction.getTarget() : null)).apply(adaptAndRetrieveInboundArguments[0]);
                int i3 = 1;
                while (true) {
                    if (!(apply instanceof Function) && !(apply instanceof Consumer)) {
                        break;
                    }
                    if (apply instanceof Function) {
                        apply = ((Function) apply).apply(adaptAndRetrieveInboundArguments[i3]);
                    } else {
                        ((Consumer) apply).accept(adaptAndRetrieveInboundArguments[i3]);
                        apply = null;
                    }
                    i3++;
                }
                if (apply != null) {
                    if (apply.getClass().isArray()) {
                        Assert.isTrue(strArr.length == ((Object[]) apply).length, "Result does not match with the number of declared outbounds");
                    } else {
                        Assert.isTrue(strArr.length == 1, "Result does not match with the number of declared outbounds");
                    }
                    if (apply.getClass().isArray()) {
                        int i4 = 0;
                        for (Object obj : (Object[]) apply) {
                            int i5 = i4;
                            i4++;
                            ((KStreamBoundElementFactory.KStreamWrapper) this.applicationContext.getBean(strArr[i5])).wrap((KStream) obj);
                        }
                    } else {
                        ((KStreamBoundElementFactory.KStreamWrapper) this.applicationContext.getBean(strArr[0])).wrap((KStream) apply);
                    }
                }
            } else {
                ((Consumer) this.functionCatalog.lookup(Consumer.class, str)).accept(adaptAndRetrieveInboundArguments[0]);
            }
        } catch (Exception e) {
            throw new BeanInitializationException("Cannot setup StreamListener for foobar", e);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:13:0x00e0. Please report as an issue. */
    private Object[] adaptAndRetrieveInboundArguments(Map<String, ResolvableType> map, String str) {
        Object[] objArr = new Object[map.size()];
        int i = 0;
        for (String str2 : map.keySet()) {
            Class<?> rawClass = map.get(str2).getRawClass();
            if (str2 == null) {
                throw new IllegalStateException("A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.");
            }
            Assert.isInstanceOf(String.class, str2, "Annotation value must be a String");
            Object bean = this.applicationContext.getBean(str2);
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(str2);
            enableNativeDecodingForKTableAlways(rawClass, bindingProperties);
            if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(str)) {
                buildStreamsBuilderAndRetrieveConfig(str, this.applicationContext, str2);
            }
            try {
                StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap.get(str);
                StreamsBuilder streamsBuilder = (StreamsBuilder) streamsBuilderFactoryBean.getObject();
                KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties = (KafkaStreamsConsumerProperties) this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(str2);
                Serde<?> inboundKeySerde = this.keyValueSerdeResolver.getInboundKeySerde(kafkaStreamsConsumerProperties);
                Serde<?> inboundValueSerde = this.keyValueSerdeResolver.getInboundValueSerde(bindingProperties.getConsumer(), kafkaStreamsConsumerProperties);
                KafkaConsumerProperties.StartOffset startOffset = kafkaStreamsConsumerProperties.getStartOffset();
                Topology.AutoOffsetReset autoOffsetReset = null;
                if (startOffset != null) {
                    switch (AnonymousClass2.$SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset[startOffset.ordinal()]) {
                        case 1:
                            autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
                            break;
                        case 2:
                            autoOffsetReset = Topology.AutoOffsetReset.LATEST;
                            break;
                    }
                }
                if (kafkaStreamsConsumerProperties.isResetOffsets()) {
                    LOG.warn("Detected resetOffsets configured on binding " + str2 + ". Setting resetOffsets in Kafka Streams binder does not have any effect.");
                }
                if (rawClass.isAssignableFrom(KStream.class)) {
                    KStream<?, ?> kStream = getkStream(str2, bindingProperties, streamsBuilder, inboundKeySerde, inboundValueSerde, autoOffsetReset);
                    KStream<?, ?> kStream2 = (KStreamBoundElementFactory.KStreamWrapper) bean;
                    kStream2.wrap(kStream);
                    this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                    if (KStream.class.isAssignableFrom(map.get(str2).getRawClass())) {
                        Class<?> rawClass2 = map.get(str2).getGeneric(new int[]{1}).getRawClass() != null ? map.get(str2).getGeneric(new int[]{1}).getRawClass() : Object.class;
                        if (this.kafkaStreamsBindingInformationCatalogue.isUseNativeDecoding(kStream2)) {
                            objArr[i] = kStream;
                        } else {
                            objArr[i] = this.kafkaStreamsMessageConversionDelegate.deserializeOnInbound(rawClass2, kStream);
                        }
                    }
                    if (objArr[i] == null) {
                        objArr[i] = kStream;
                    }
                    Assert.notNull(objArr[i], "problems..");
                } else if (rawClass.isAssignableFrom(KTable.class)) {
                    KTable<?, ?> kTable = getKTable(streamsBuilder, inboundKeySerde, inboundValueSerde, kafkaStreamsConsumerProperties.getMaterializedAs(), this.bindingServiceProperties.getBindingDestination(str2), autoOffsetReset);
                    ((KTableBoundElementFactory.KTableWrapper) bean).wrap(kTable);
                    this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                    objArr[i] = kTable;
                } else if (rawClass.isAssignableFrom(GlobalKTable.class)) {
                    GlobalKTable<?, ?> globalKTable = getGlobalKTable(streamsBuilder, inboundKeySerde, inboundValueSerde, kafkaStreamsConsumerProperties.getMaterializedAs(), this.bindingServiceProperties.getBindingDestination(str2), autoOffsetReset);
                    ((GlobalKTableBoundElementFactory.GlobalKTableWrapper) bean).wrap(globalKTable);
                    this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                    objArr[i] = globalKTable;
                }
                i++;
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
        return objArr;
    }

    private GlobalKTable<?, ?> getGlobalKTable(StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, String str, String str2, Topology.AutoOffsetReset autoOffsetReset) {
        return str != null ? materializedAsGlobalKTable(streamsBuilder, str2, str, serde, serde2, autoOffsetReset) : streamsBuilder.globalTable(str2, Consumed.with(serde, serde2).withOffsetResetPolicy(autoOffsetReset));
    }

    private KTable<?, ?> getKTable(StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, String str, String str2, Topology.AutoOffsetReset autoOffsetReset) {
        return str != null ? materializedAs(streamsBuilder, str2, str, serde, serde2, autoOffsetReset) : streamsBuilder.table(str2, Consumed.with(serde, serde2).withOffsetResetPolicy(autoOffsetReset));
    }

    private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String str, String str2, Serde<K> serde, Serde<V> serde2, Topology.AutoOffsetReset autoOffsetReset) {
        return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(str), Consumed.with(serde, serde2).withOffsetResetPolicy(autoOffsetReset), getMaterialized(str2, serde, serde2));
    }

    private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(StreamsBuilder streamsBuilder, String str, String str2, Serde<K> serde, Serde<V> serde2, Topology.AutoOffsetReset autoOffsetReset) {
        return streamsBuilder.globalTable(this.bindingServiceProperties.getBindingDestination(str), Consumed.with(serde, serde2).withOffsetResetPolicy(autoOffsetReset), getMaterialized(str2, serde, serde2));
    }

    private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String str, Serde<K> serde, Serde<V> serde2) {
        return Materialized.as(str).withKeySerde(serde).withValueSerde(serde2);
    }

    private KStream<?, ?> getkStream(String str, BindingProperties bindingProperties, StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, Topology.AutoOffsetReset autoOffsetReset) {
        KStream stream = streamsBuilder.stream(Arrays.asList(StringUtils.commaDelimitedListToStringArray(this.bindingServiceProperties.getBindingDestination(str))), Consumed.with(serde, serde2).withOffsetResetPolicy(autoOffsetReset));
        boolean isUseNativeDecoding = this.bindingServiceProperties.getConsumerProperties(str).isUseNativeDecoding();
        if (isUseNativeDecoding) {
            LOG.info("Native decoding is enabled for " + str + ". Inbound deserialization done at the broker.");
        } else {
            LOG.info("Native decoding is disabled for " + str + ". Inbound message conversion done by Spring Cloud Stream.");
        }
        return stream.mapValues(obj -> {
            String contentType = bindingProperties.getContentType();
            return (obj == null || StringUtils.isEmpty(contentType) || isUseNativeDecoding) ? obj : MessageBuilder.withPayload(obj).setHeader("contentType", contentType).build();
        });
    }

    private void enableNativeDecodingForKTableAlways(Class<?> cls, BindingProperties bindingProperties) {
        if (cls.isAssignableFrom(KTable.class) || cls.isAssignableFrom(GlobalKTable.class)) {
            if (bindingProperties.getConsumer() == null) {
                bindingProperties.setConsumer(new ConsumerProperties());
            }
            bindingProperties.getConsumer().setUseNativeDecoding(true);
        }
    }

    private void buildStreamsBuilderAndRetrieveConfig(String str, ApplicationContext applicationContext, String str2) {
        BeanDefinitionRegistry beanFactory = this.applicationContext.getBeanFactory();
        Map map = (Map) applicationContext.getBean("streamConfigGlobalProperties", Map.class);
        KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties = (KafkaStreamsConsumerProperties) this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(str2);
        map.putAll(kafkaStreamsConsumerProperties.getConfiguration());
        String applicationId = kafkaStreamsConsumerProperties.getApplicationId();
        if (StringUtils.hasText(applicationId)) {
            map.put("application.id", applicationId);
        }
        int concurrency = this.bindingServiceProperties.getConsumerProperties(str2).getConcurrency();
        if (concurrency > 1) {
            map.put("num.stream.threads", Integer.valueOf(concurrency));
        }
        final Map map2 = (Map) applicationContext.getBean("kafkaStreamsDlqDispatchers", Map.class);
        KafkaStreamsConfiguration kafkaStreamsConfiguration = new KafkaStreamsConfiguration(map) { // from class: org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor.1
            public Properties asProperties() {
                Properties asProperties = super.asProperties();
                asProperties.put(SendToDlqAndContinue.KAFKA_STREAMS_DLQ_DISPATCHERS, map2);
                return asProperties;
            }
        };
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.cleanupConfig == null ? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) : new StreamsBuilderFactoryBean(kafkaStreamsConfiguration, this.cleanupConfig);
        streamsBuilderFactoryBean.setAutoStartup(false);
        beanFactory.registerBeanDefinition("stream-builder-" + str, BeanDefinitionBuilder.genericBeanDefinition(streamsBuilderFactoryBean.getClass(), () -> {
            return streamsBuilderFactoryBean;
        }).getRawBeanDefinition());
        this.methodStreamsBuilderFactoryBeanMap.put(str, (StreamsBuilderFactoryBean) applicationContext.getBean("&stream-builder-" + str, StreamsBuilderFactoryBean.class));
    }

    public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }
}
