package org.springframework.cloud.stream.binder.kstream;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.cloud.stream.binding.AbstractBindingTargetFactory;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kstream/KStreamBoundElementFactory.class */
public class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KStream> {
    private final KStreamBuilder kStreamBuilder;
    private final BindingServiceProperties bindingServiceProperties;
    private CompositeMessageConverterFactory compositeMessageConverterFactory;

    /* loaded from: input_file:org/springframework/cloud/stream/binder/kstream/KStreamBoundElementFactory$KStreamWrapper.class */
    public interface KStreamWrapper {
        void wrap(KStream<Object, Object> kStream);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kstream/KStreamBoundElementFactory$KStreamWrapperHandler.class */
    public static class KStreamWrapperHandler implements KStreamWrapper, MethodInterceptor {
        private KStream<Object, Object> delegate;
        private final MessageConverter messageConverter;
        private final BindingServiceProperties bindingServiceProperties;
        private String name;

        KStreamWrapperHandler(MessageConverter messageConverter, BindingServiceProperties bindingServiceProperties, String str) {
            this.messageConverter = messageConverter;
            this.bindingServiceProperties = bindingServiceProperties;
            this.name = str;
        }

        @Override // org.springframework.cloud.stream.binder.kstream.KStreamBoundElementFactory.KStreamWrapper
        public void wrap(KStream<Object, Object> kStream) {
            Assert.notNull(kStream, "delegate cannot be null");
            Assert.isNull(this.delegate, "delegate already set to " + this.delegate);
            if (this.messageConverter != null) {
                kStream = kStream.map((obj, obj2) -> {
                    Message message = (Message) obj2;
                    String contentType = this.bindingServiceProperties.getBindingProperties(this.name).getContentType();
                    MutableMessageHeaders mutableMessageHeaders = new MutableMessageHeaders(((Message) obj2).getHeaders());
                    if (!StringUtils.isEmpty(contentType)) {
                        mutableMessageHeaders.put("contentType", contentType);
                    }
                    return new KeyValue(obj, this.messageConverter.toMessage(message.getPayload(), mutableMessageHeaders));
                });
            }
            this.delegate = kStream;
        }

        public Object invoke(MethodInvocation methodInvocation) throws Throwable {
            if (methodInvocation.getMethod().getDeclaringClass().equals(KStream.class)) {
                Assert.notNull(this.delegate, "Trying to invoke " + methodInvocation.getMethod() + "  but no delegate has been set.");
                return methodInvocation.getMethod().invoke(this.delegate, methodInvocation.getArguments());
            }
            if (methodInvocation.getMethod().getDeclaringClass().equals(KStreamWrapper.class)) {
                return methodInvocation.getMethod().invoke(this, methodInvocation.getArguments());
            }
            throw new IllegalStateException("Only KStream method invocations are permitted");
        }
    }

    public KStreamBoundElementFactory(KStreamBuilder kStreamBuilder, BindingServiceProperties bindingServiceProperties, CompositeMessageConverterFactory compositeMessageConverterFactory) {
        super(KStream.class);
        this.bindingServiceProperties = bindingServiceProperties;
        this.kStreamBuilder = kStreamBuilder;
        this.compositeMessageConverterFactory = compositeMessageConverterFactory;
    }

    /* renamed from: createInput, reason: merged with bridge method [inline-methods] */
    public KStream m3createInput(String str) {
        return this.kStreamBuilder.stream(new String[]{this.bindingServiceProperties.getBindingDestination(str)}).map((obj, obj2) -> {
            String contentType = this.bindingServiceProperties.getBindingProperties(str).getContentType();
            return !StringUtils.isEmpty(contentType) ? new KeyValue(obj, MessageBuilder.withPayload(obj2).setHeader("contentType", contentType).build()) : new KeyValue(obj, obj2);
        });
    }

    /* renamed from: createOutput, reason: merged with bridge method [inline-methods] */
    public KStream m2createOutput(String str) {
        String contentType = this.bindingServiceProperties.getBindingProperties(str).getContentType();
        KStreamWrapperHandler kStreamWrapperHandler = new KStreamWrapperHandler(StringUtils.hasText(contentType) ? this.compositeMessageConverterFactory.getMessageConverterForType(MimeType.valueOf(contentType)) : null, this.bindingServiceProperties, str);
        ProxyFactory proxyFactory = new ProxyFactory(new Class[]{KStreamWrapper.class, KStream.class});
        proxyFactory.addAdvice(kStreamWrapperHandler);
        return (KStream) proxyFactory.getProxy();
    }
}
