package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsMessageConversionDelegate.class */
public class KafkaStreamsMessageConversionDelegate {
    private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal<>();
    private final CompositeMessageConverterFactory compositeMessageConverterFactory;
    private final SendToDlqAndContinue sendToDlqAndContinue;
    private final KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue;
    private final KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamsMessageConversionDelegate(CompositeMessageConverterFactory compositeMessageConverterFactory, SendToDlqAndContinue sendToDlqAndContinue, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
        this.compositeMessageConverterFactory = compositeMessageConverterFactory;
        this.sendToDlqAndContinue = sendToDlqAndContinue;
        this.kstreamBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kstreamBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
    }

    public KStream serializeOnOutbound(KStream<?, ?> kStream) {
        String contentType = this.kstreamBindingInformationCatalogue.getContentType(kStream);
        CompositeMessageConverter messageConverterForAllRegistered = this.compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        return kStream.mapValues(obj -> {
            Message build = obj instanceof Message ? (Message) obj : MessageBuilder.withPayload(obj).build();
            HashMap hashMap = new HashMap((Map) build.getHeaders());
            if (!StringUtils.isEmpty(contentType)) {
                hashMap.put("contentType", contentType);
            }
            return messageConverterForAllRegistered.toMessage(build.getPayload(), new MessageHeaders(hashMap)).getPayload();
        });
    }

    public KStream deserializeOnInbound(Class<?> cls, KStream<?, ?> kStream) {
        CompositeMessageConverter messageConverterForAllRegistered = this.compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        KStream<?, ?>[] branch = kStream.branch(new Predicate[]{(obj, obj2) -> {
            boolean z = false;
            try {
                if (cls.isAssignableFrom(obj2.getClass())) {
                    keyValueThreadLocal.set(new KeyValue<>(obj, obj2));
                } else if (obj2 instanceof Message) {
                    if (cls.isAssignableFrom(((Message) obj2).getPayload().getClass())) {
                        keyValueThreadLocal.set(new KeyValue<>(obj, ((Message) obj2).getPayload()));
                    } else {
                        convertAndSetMessage(obj, cls, messageConverterForAllRegistered, (Message) obj2);
                    }
                } else if ((obj2 instanceof String) || (obj2 instanceof byte[])) {
                    convertAndSetMessage(obj, cls, messageConverterForAllRegistered, MessageBuilder.withPayload(obj2).build());
                } else {
                    keyValueThreadLocal.set(new KeyValue<>(obj, obj2));
                }
                z = true;
            } catch (Exception e) {
            }
            return z;
        }, (obj3, obj4) -> {
            return true;
        }});
        processErrorFromDeserialization(kStream, branch[1]);
        return branch[0].mapValues(obj5 -> {
            Object obj5 = keyValueThreadLocal.get().value;
            keyValueThreadLocal.remove();
            return obj5;
        });
    }

    private void convertAndSetMessage(Object obj, Class<?> cls, MessageConverter messageConverter, Message<?> message) {
        Object fromMessage = messageConverter.fromMessage(message, cls);
        if (fromMessage == null) {
            throw new IllegalStateException("Inbound data conversion failed.");
        }
        keyValueThreadLocal.set(new KeyValue<>(obj, fromMessage));
    }

    private void processErrorFromDeserialization(KStream<?, ?> kStream, KStream<?, ?> kStream2) {
        kStream2.process(() -> {
            return new Processor() { // from class: org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.1
                ProcessorContext context;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                }

                public void process(Object obj, Object obj2) {
                    if (!KafkaStreamsMessageConversionDelegate.this.kstreamBindingInformationCatalogue.isDlqEnabled(kStream)) {
                        if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
                            throw new IllegalStateException("Inbound deserialization failed.");
                        }
                        if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
                        }
                    } else {
                        String str = this.context.topic();
                        if (obj2 instanceof Message) {
                            KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(str, (byte[]) obj, (byte[]) ((Message) obj2).getPayload(), this.context.partition());
                        } else {
                            KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(str, (byte[]) obj, (byte[]) obj2, this.context.partition());
                        }
                    }
                }

                public void close() {
                }
            };
        }, new String[0]);
    }
}
