package org.springframework.cloud.stream.binder.kafka.streams;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsStateStore;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsStateStoreProperties;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.binding.StreamListenerSetupMethodOrchestrator;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsStreamListenerSetupMethodOrchestrator.class */
class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStreamsBinderProcessor implements StreamListenerSetupMethodOrchestrator {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsStreamListenerSetupMethodOrchestrator.class);
    private final StreamListenerParameterAdapter streamListenerParameterAdapter;
    private final Collection<StreamListenerResultAdapter> streamListenerResultAdapters;
    private final BindingServiceProperties bindingServiceProperties;
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final Map<Method, List<String>> registeredStoresPerMethod;
    private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap;
    StreamsBuilderFactoryBeanCustomizer customizer;
    private final ConfigurableEnvironment environment;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, StreamListenerParameterAdapter streamListenerParameterAdapter, Collection<StreamListenerResultAdapter> collection, CleanupConfig cleanupConfig, StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer, ConfigurableEnvironment configurableEnvironment) {
        super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
        this.registeredStoresPerMethod = new HashMap();
        this.methodStreamsBuilderFactoryBeanMap = new HashMap();
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.streamListenerParameterAdapter = streamListenerParameterAdapter;
        this.streamListenerResultAdapters = collection;
        this.customizer = streamsBuilderFactoryBeanCustomizer;
        this.environment = configurableEnvironment;
    }

    public boolean supports(Method method) {
        return methodParameterSupports(method) && (methodReturnTypeSuppports(method) || Void.TYPE.equals(method.getReturnType()));
    }

    private boolean methodReturnTypeSuppports(Method method) {
        Class<?> returnType = method.getReturnType();
        if (returnType.equals(KStream.class)) {
            return true;
        }
        return returnType.isArray() && returnType.getComponentType().equals(KStream.class);
    }

    private boolean methodParameterSupports(Method method) {
        boolean z = false;
        for (int i = 0; i < method.getParameterCount(); i++) {
            Class parameterType = MethodParameter.forExecutable(method, i).getParameterType();
            if (parameterType.equals(KStream.class) || parameterType.equals(KTable.class) || parameterType.equals(GlobalKTable.class)) {
                z = true;
            }
        }
        return z;
    }

    public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object obj) {
        String[] outboundBindingTargetNames = getOutboundBindingTargetNames(method);
        validateStreamListenerMethod(streamListener, method, outboundBindingTargetNames);
        Object[] adaptAndRetrieveInboundArguments = adaptAndRetrieveInboundArguments(method, streamListener.value(), this.applicationContext, this.streamListenerParameterAdapter);
        try {
            ReflectionUtils.makeAccessible(method);
            if (Void.TYPE.equals(method.getReturnType())) {
                method.invoke(obj, adaptAndRetrieveInboundArguments);
            } else {
                Object invoke = method.invoke(obj, adaptAndRetrieveInboundArguments);
                if (outboundBindingTargetNames != null && outboundBindingTargetNames.length > 0) {
                    if (invoke.getClass().isArray()) {
                        Assert.isTrue(outboundBindingTargetNames.length == ((Object[]) invoke).length, "Result does not match with the number of declared outbounds");
                    } else {
                        Assert.isTrue(outboundBindingTargetNames.length == 1, "Result does not match with the number of declared outbounds");
                    }
                }
                this.kafkaStreamsBindingInformationCatalogue.setOutboundKStreamResolvable(ResolvableType.forMethodReturnType(method));
                if (outboundBindingTargetNames != null && outboundBindingTargetNames.length > 0) {
                    if (invoke.getClass().isArray()) {
                        int i = 0;
                        for (Object obj2 : (Object[]) invoke) {
                            int i2 = i;
                            i++;
                            adaptStreamListenerResult(obj2, this.applicationContext.getBean(outboundBindingTargetNames[i2]));
                        }
                    } else {
                        adaptStreamListenerResult(invoke, this.applicationContext.getBean(outboundBindingTargetNames[0]));
                    }
                }
            }
        } catch (Exception e) {
            throw new BeanInitializationException("Cannot setup StreamListener for " + method, e);
        }
    }

    private void adaptStreamListenerResult(Object obj, Object obj2) {
        for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
            if (streamListenerResultAdapter.supports(obj.getClass(), obj2.getClass())) {
                streamListenerResultAdapter.adapt(obj, obj2);
                return;
            }
        }
    }

    public Object[] adaptAndRetrieveInboundArguments(Method method, String str, ApplicationContext applicationContext, StreamListenerParameterAdapter... streamListenerParameterAdapterArr) {
        Object[] objArr = new Object[method.getParameterTypes().length];
        int i = 0;
        while (i < objArr.length) {
            MethodParameter forExecutable = MethodParameter.forExecutable(method, i);
            Class<?> parameterType = forExecutable.getParameterType();
            Object obj = null;
            if (forExecutable.hasParameterAnnotation(Input.class)) {
                obj = AnnotationUtils.getValue(forExecutable.getParameterAnnotation(Input.class));
                str = forExecutable.getParameterAnnotation(Input.class).value();
            } else if (objArr.length == 1 && StringUtils.hasText(str)) {
                obj = str;
            }
            if (obj == null) {
                throw new IllegalStateException("A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.");
            }
            Assert.isInstanceOf(String.class, obj, "Annotation value must be a String");
            Object bean = applicationContext.getBean((String) obj);
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(str);
            if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
                this.methodStreamsBuilderFactoryBeanMap.put(method, buildStreamsBuilderAndRetrieveConfig(method.getDeclaringClass().getSimpleName() + "-" + method.getName(), applicationContext, str, null, this.customizer, this.environment, bindingProperties));
            }
            try {
                StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap.get(method);
                StreamsBuilder streamsBuilder = (StreamsBuilder) streamsBuilderFactoryBean.getObject();
                String property = streamsBuilderFactoryBean.getStreamsConfiguration().getProperty("application.id");
                KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties = (KafkaStreamsConsumerProperties) this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(str);
                kafkaStreamsConsumerProperties.setApplicationId(property);
                KafkaStreamsStateStoreProperties buildStateStoreSpec = buildStateStoreSpec(method);
                Serde<?> inboundKeySerde = this.keyValueSerdeResolver.getInboundKeySerde(kafkaStreamsConsumerProperties, ResolvableType.forMethodParameter(forExecutable));
                LOG.info("Key Serde used for " + obj + ": " + inboundKeySerde.getClass().getName());
                Serde<?> valueSerde = this.bindingServiceProperties.getConsumerProperties(str).isUseNativeDecoding() ? getValueSerde(str, kafkaStreamsConsumerProperties, ResolvableType.forMethodParameter(forExecutable)) : Serdes.ByteArray();
                LOG.info("Value Serde used for " + obj + ": " + valueSerde.getClass().getName());
                Topology.AutoOffsetReset autoOffsetReset = getAutoOffsetReset(str, kafkaStreamsConsumerProperties);
                if (parameterType.isAssignableFrom(KStream.class)) {
                    KStream<?, ?> kStream = getkStream(str, buildStateStoreSpec, bindingProperties, kafkaStreamsConsumerProperties, streamsBuilder, inboundKeySerde, valueSerde, autoOffsetReset, i == 0);
                    KStreamBoundElementFactory.KStreamWrapper kStreamWrapper = (KStreamBoundElementFactory.KStreamWrapper) bean;
                    kStreamWrapper.wrap(kStream);
                    this.kafkaStreamsBindingInformationCatalogue.addKeySerde(kStream, inboundKeySerde);
                    this.kafkaStreamsBindingInformationCatalogue.registerBindingProperties(kStream, this.kafkaStreamsBindingInformationCatalogue.getBindingProperties().get(kStreamWrapper));
                    this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                    int length = streamListenerParameterAdapterArr.length;
                    int i2 = 0;
                    while (true) {
                        if (i2 >= length) {
                            break;
                        }
                        StreamListenerParameterAdapter streamListenerParameterAdapter = streamListenerParameterAdapterArr[i2];
                        if (streamListenerParameterAdapter.supports(kStream.getClass(), forExecutable)) {
                            objArr[i] = streamListenerParameterAdapter.adapt(kStream, forExecutable);
                            break;
                        }
                        i2++;
                    }
                    if (objArr[i] == null && parameterType.isAssignableFrom(kStream.getClass())) {
                        objArr[i] = kStream;
                    }
                    Assert.notNull(objArr[i], "Cannot convert argument " + i + " of " + method + "from " + kStream.getClass() + " to " + parameterType);
                } else {
                    handleKTableGlobalKTableInputs(objArr, i, str, parameterType, bean, streamsBuilderFactoryBean, streamsBuilder, kafkaStreamsConsumerProperties, inboundKeySerde, valueSerde, autoOffsetReset, i == 0);
                }
                i++;
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
        return objArr;
    }

    private StoreBuilder buildStateStore(KafkaStreamsStateStoreProperties kafkaStreamsStateStoreProperties) {
        StoreBuilder sessionStoreBuilder;
        try {
            Serde<?> stateStoreKeySerde = this.keyValueSerdeResolver.getStateStoreKeySerde(kafkaStreamsStateStoreProperties.getKeySerdeString());
            Serde<?> stateStoreValueSerde = this.keyValueSerdeResolver.getStateStoreValueSerde(kafkaStreamsStateStoreProperties.getValueSerdeString());
            switch (kafkaStreamsStateStoreProperties.getType()) {
                case KEYVALUE:
                    sessionStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(kafkaStreamsStateStoreProperties.getName()), stateStoreKeySerde, stateStoreValueSerde);
                    break;
                case WINDOW:
                    sessionStoreBuilder = Stores.windowStoreBuilder(Stores.persistentWindowStore(kafkaStreamsStateStoreProperties.getName(), kafkaStreamsStateStoreProperties.getRetention(), 3, kafkaStreamsStateStoreProperties.getLength(), false), stateStoreKeySerde, stateStoreValueSerde);
                    break;
                case SESSION:
                    sessionStoreBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(kafkaStreamsStateStoreProperties.getName(), kafkaStreamsStateStoreProperties.getRetention()), stateStoreKeySerde, stateStoreValueSerde);
                    break;
                default:
                    throw new UnsupportedOperationException("state store type (" + kafkaStreamsStateStoreProperties.getType() + ") is not supported!");
            }
            if (kafkaStreamsStateStoreProperties.isCacheEnabled()) {
                sessionStoreBuilder = sessionStoreBuilder.withCachingEnabled();
            }
            if (kafkaStreamsStateStoreProperties.isLoggingDisabled()) {
                sessionStoreBuilder = sessionStoreBuilder.withLoggingDisabled();
            }
            return sessionStoreBuilder;
        } catch (Exception e) {
            LOG.error("failed to build state store exception : " + e);
            throw e;
        }
    }

    private KStream<?, ?> getkStream(String str, KafkaStreamsStateStoreProperties kafkaStreamsStateStoreProperties, BindingProperties bindingProperties, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, Topology.AutoOffsetReset autoOffsetReset, boolean z) {
        if (kafkaStreamsStateStoreProperties != null) {
            StoreBuilder buildStateStore = buildStateStore(kafkaStreamsStateStoreProperties);
            streamsBuilder.addStateStore(buildStateStore);
            if (LOG.isInfoEnabled()) {
                LOG.info("state store " + buildStateStore.name() + " added to topology");
            }
        }
        return getKStream(str, bindingProperties, kafkaStreamsConsumerProperties, streamsBuilder, serde, serde2, autoOffsetReset, z);
    }

    private void validateStreamListenerMethod(StreamListener streamListener, Method method, String[] strArr) {
        String value = streamListener.value();
        if (strArr != null) {
            for (String str : strArr) {
                if (StringUtils.hasText(str)) {
                    Assert.isTrue(isDeclarativeOutput(method, str), "Method must be declarative");
                }
            }
        }
        if (StringUtils.hasText(value)) {
            int length = method.getParameterTypes().length;
            for (int i = 0; i < length; i++) {
                Assert.isTrue(isDeclarativeInput(value, MethodParameter.forExecutable(method, i)), "Method must be declarative");
            }
        }
    }

    private boolean isDeclarativeOutput(Method method, String str) {
        Class<?> returnType = method.getReturnType();
        if (returnType.isArray()) {
            Class type = this.applicationContext.getType(str);
            return this.streamListenerResultAdapters.stream().anyMatch(streamListenerResultAdapter -> {
                return streamListenerResultAdapter.supports(returnType.getComponentType(), type);
            });
        }
        Class type2 = this.applicationContext.getType(str);
        return this.streamListenerResultAdapters.stream().anyMatch(streamListenerResultAdapter2 -> {
            return streamListenerResultAdapter2.supports(returnType, type2);
        });
    }

    private boolean isDeclarativeInput(String str, MethodParameter methodParameter) {
        Class type;
        if (methodParameter.getParameterType().isAssignableFrom(Object.class) || !this.applicationContext.containsBean(str) || (type = this.applicationContext.getType(str)) == null) {
            return false;
        }
        boolean supportsKStream = KafkaStreamsBinderUtils.supportsKStream(methodParameter, type);
        if (!supportsKStream) {
            supportsKStream = KTable.class.isAssignableFrom(type) && KTable.class.isAssignableFrom(methodParameter.getParameterType());
            if (!supportsKStream) {
                supportsKStream = GlobalKTable.class.isAssignableFrom(type) && GlobalKTable.class.isAssignableFrom(methodParameter.getParameterType());
            }
        }
        return supportsKStream;
    }

    private static String[] getOutboundBindingTargetNames(Method method) {
        SendTo findAnnotation = AnnotationUtils.findAnnotation(method, SendTo.class);
        if (findAnnotation == null) {
            return null;
        }
        Assert.isTrue(!ObjectUtils.isEmpty(findAnnotation.value()), "At least one output must be specified");
        Assert.isTrue(findAnnotation.value().length >= 1, "At least one outbound destination need to be provided.");
        return findAnnotation.value();
    }

    private KafkaStreamsStateStoreProperties buildStateStoreSpec(Method method) {
        KafkaStreamsStateStore kafkaStreamsStateStore;
        if (this.registeredStoresPerMethod.containsKey(method) || (kafkaStreamsStateStore = (KafkaStreamsStateStore) AnnotationUtils.findAnnotation(method, KafkaStreamsStateStore.class)) == null) {
            return null;
        }
        Assert.isTrue(!ObjectUtils.isEmpty(kafkaStreamsStateStore.name()), "name cannot be empty");
        Assert.isTrue(kafkaStreamsStateStore.name().length() >= 1, "name cannot be empty.");
        this.registeredStoresPerMethod.put(method, new ArrayList());
        this.registeredStoresPerMethod.get(method).add(kafkaStreamsStateStore.name());
        KafkaStreamsStateStoreProperties kafkaStreamsStateStoreProperties = new KafkaStreamsStateStoreProperties();
        kafkaStreamsStateStoreProperties.setName(kafkaStreamsStateStore.name());
        kafkaStreamsStateStoreProperties.setType(kafkaStreamsStateStore.type());
        kafkaStreamsStateStoreProperties.setLength(kafkaStreamsStateStore.lengthMs());
        kafkaStreamsStateStoreProperties.setKeySerdeString(kafkaStreamsStateStore.keySerde());
        kafkaStreamsStateStoreProperties.setRetention(kafkaStreamsStateStore.retentionMs());
        kafkaStreamsStateStoreProperties.setValueSerdeString(kafkaStreamsStateStore.valueSerde());
        kafkaStreamsStateStoreProperties.setCacheEnabled(kafkaStreamsStateStore.cache());
        kafkaStreamsStateStoreProperties.setLoggingDisabled(!kafkaStreamsStateStore.logging());
        return kafkaStreamsStateStoreProperties;
    }
}
