package org.springframework.xd.dirt.integration.redis;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Properties;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.expression.IntegrationEvaluationContextAware;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.redis.inbound.RedisInboundChannelAdapter;
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;
import org.springframework.integration.redis.outbound.RedisPublishingMessageHandler;
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.integration.bus.Binding;
import org.springframework.xd.dirt.integration.bus.MessageBusSupport;
import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec;

/* loaded from: input_file:org/springframework/xd/dirt/integration/redis/RedisMessageBus.class */
public class RedisMessageBus extends MessageBusSupport implements DisposableBean, IntegrationEvaluationContextAware {
    private static final String REPLY_TO = "replyTo";
    private static final SpelExpressionParser parser = new SpelExpressionParser();
    private RedisConnectionFactory connectionFactory;
    private final EmbeddedHeadersMessageConverter embeddedHeadersMessageConverter = new EmbeddedHeadersMessageConverter();
    private volatile EvaluationContext evaluationContext;

    /* loaded from: input_file:org/springframework/xd/dirt/integration/redis/RedisMessageBus$EmbeddedHeadersMessageConverter.class */
    static class EmbeddedHeadersMessageConverter {
        EmbeddedHeadersMessageConverter() {
        }

        Message<byte[]> embedHeaders(Message<byte[]> message, String... strArr) throws UnsupportedEncodingException {
            String[] strArr2 = new String[strArr.length];
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            for (String str : strArr) {
                String obj = message.getHeaders().get(str) == null ? null : message.getHeaders().get(str).toString();
                int i4 = i;
                i++;
                strArr2[i4] = obj;
                if (obj != null) {
                    i2++;
                    i3 += str.length() + obj.length();
                }
            }
            byte[] bArr = new byte[((byte[]) message.getPayload()).length + i3 + (i2 * 2) + 1];
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            wrap.put((byte) i2);
            for (int i5 = 0; i5 < strArr.length; i5++) {
                if (strArr2[i5] != null) {
                    wrap.put((byte) strArr[i5].length());
                    wrap.put(strArr[i5].getBytes("UTF-8"));
                    wrap.put((byte) strArr2[i5].length());
                    wrap.put(strArr2[i5].getBytes("UTF-8"));
                }
            }
            wrap.put((byte[]) message.getPayload());
            return MessageBuilder.withPayload(bArr).copyHeaders(message.getHeaders()).build();
        }

        Message<byte[]> extractHeaders(Message<byte[]> message) throws UnsupportedEncodingException {
            byte[] bArr = (byte[]) message.getPayload();
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            int i = wrap.get();
            HashMap hashMap = new HashMap();
            for (int i2 = 0; i2 < i; i2++) {
                byte b = wrap.get();
                String str = new String(bArr, wrap.position(), b, "UTF-8");
                wrap.position(wrap.position() + b);
                byte b2 = wrap.get();
                String str2 = new String(bArr, wrap.position(), b2, "UTF-8");
                wrap.position(wrap.position() + b2);
                hashMap.put(str, str2);
            }
            byte[] bArr2 = new byte[wrap.remaining()];
            wrap.get(bArr2);
            return MessageBuilder.withPayload(bArr2).copyHeaders(hashMap).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/redis/RedisMessageBus$ReceivingHandler.class */
    public class ReceivingHandler extends AbstractReplyProducingMessageHandler {
        public ReceivingHandler() {
            setBeanFactory(RedisMessageBus.this.getBeanFactory());
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected Object handleRequestMessage(Message<?> message) {
            Message message2 = message;
            try {
                message2 = RedisMessageBus.this.embeddedHeadersMessageConverter.extractHeaders(message);
            } catch (UnsupportedEncodingException e) {
                this.logger.error("Could not convert message", e);
            }
            return RedisMessageBus.this.deserializePayloadIfNecessary(message2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/integration/redis/RedisMessageBus$SendingHandler.class */
    public class SendingHandler extends AbstractMessageHandler {
        private final MessageHandler delegate;
        private final String replyTo;

        private SendingHandler(MessageHandler messageHandler, String str) {
            this.delegate = messageHandler;
            this.replyTo = str;
            setBeanFactory(RedisMessageBus.this.getBeanFactory());
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            Message<byte[]> serializePayloadIfNecessary = RedisMessageBus.this.serializePayloadIfNecessary(message, MediaType.APPLICATION_OCTET_STREAM);
            if (this.replyTo != null) {
                serializePayloadIfNecessary = MessageBuilder.fromMessage(serializePayloadIfNecessary).setHeader(RedisMessageBus.REPLY_TO, this.replyTo).build();
            }
            Message<byte[]> embedHeaders = RedisMessageBus.this.embeddedHeadersMessageConverter.embedHeaders(serializePayloadIfNecessary, "contentType", "originalContentType", RedisMessageBus.REPLY_TO);
            Assert.isInstanceOf(byte[].class, embedHeaders.getPayload());
            this.delegate.handleMessage(embedHeaders);
        }
    }

    public RedisMessageBus(RedisConnectionFactory redisConnectionFactory, MultiTypeCodec<Object> multiTypeCodec) {
        Assert.notNull(redisConnectionFactory, "connectionFactory must not be null");
        Assert.notNull(multiTypeCodec, "codec must not be null");
        this.connectionFactory = redisConnectionFactory;
        setCodec(multiTypeCodec);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBusSupport
    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindConsumer(String str, MessageChannel messageChannel, Properties properties) {
        RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint = new RedisQueueMessageDrivenEndpoint("queue." + str, this.connectionFactory);
        redisQueueMessageDrivenEndpoint.setBeanFactory(getBeanFactory());
        redisQueueMessageDrivenEndpoint.setSerializer((RedisSerializer) null);
        registerNamedChannelForConsumerIfNecessary(str, false);
        doRegisterConsumer(str, messageChannel, redisQueueMessageDrivenEndpoint);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindPubSubConsumer(String str, MessageChannel messageChannel, Properties properties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("declaring pubsub for inbound: " + str);
        }
        registerNamedChannelForConsumerIfNecessary(str, true);
        RedisInboundChannelAdapter redisInboundChannelAdapter = new RedisInboundChannelAdapter(this.connectionFactory);
        redisInboundChannelAdapter.setBeanFactory(getBeanFactory());
        redisInboundChannelAdapter.setSerializer((RedisSerializer) null);
        redisInboundChannelAdapter.setTopics(new String[]{"topic." + str});
        doRegisterConsumer(str, messageChannel, redisInboundChannelAdapter);
    }

    private void doRegisterConsumer(String str, MessageChannel messageChannel, MessageProducerSupport messageProducerSupport) {
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanFactory(getBeanFactory());
        directChannel.setBeanName(str + ".bridge");
        messageProducerSupport.setOutputChannel(directChannel);
        messageProducerSupport.setBeanName("inbound." + str);
        messageProducerSupport.afterPropertiesSet();
        Binding forConsumer = Binding.forConsumer(messageProducerSupport, messageChannel);
        addBinding(forConsumer);
        ReceivingHandler receivingHandler = new ReceivingHandler();
        receivingHandler.setOutputChannel(messageChannel);
        receivingHandler.setBeanName(str + ".convert.bridge");
        receivingHandler.afterPropertiesSet();
        directChannel.subscribe(receivingHandler);
        forConsumer.start();
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindProducer(String str, MessageChannel messageChannel, Properties properties) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        RedisQueueOutboundChannelAdapter redisQueueOutboundChannelAdapter = new RedisQueueOutboundChannelAdapter("queue." + str, this.connectionFactory);
        redisQueueOutboundChannelAdapter.setBeanFactory(getBeanFactory());
        redisQueueOutboundChannelAdapter.afterPropertiesSet();
        doRegisterProducer(str, messageChannel, redisQueueOutboundChannelAdapter);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindPubSubProducer(String str, MessageChannel messageChannel, Properties properties) {
        RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(this.connectionFactory);
        redisPublishingMessageHandler.setBeanFactory(getBeanFactory());
        redisPublishingMessageHandler.setTopic("topic." + str);
        redisPublishingMessageHandler.afterPropertiesSet();
        doRegisterProducer(str, messageChannel, redisPublishingMessageHandler);
    }

    private void doRegisterProducer(String str, MessageChannel messageChannel, MessageHandler messageHandler) {
        doRegisterProducer(str, messageChannel, messageHandler, null);
    }

    private void doRegisterProducer(String str, MessageChannel messageChannel, MessageHandler messageHandler, String str2) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new SendingHandler(messageHandler, str2));
        eventDrivenConsumer.setBeanFactory(getBeanFactory());
        eventDrivenConsumer.setBeanName("outbound." + str);
        eventDrivenConsumer.afterPropertiesSet();
        Binding forProducer = Binding.forProducer(messageChannel, eventDrivenConsumer);
        addBinding(forProducer);
        forProducer.start();
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindRequestor(String str, MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("binding requestor: " + str);
        }
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        RedisQueueOutboundChannelAdapter redisQueueOutboundChannelAdapter = new RedisQueueOutboundChannelAdapter("queue." + str + ".requests", this.connectionFactory);
        redisQueueOutboundChannelAdapter.setBeanFactory(getBeanFactory());
        redisQueueOutboundChannelAdapter.afterPropertiesSet();
        String str2 = str + ".replies." + getIdGenerator().generateId();
        doRegisterProducer(str, messageChannel, redisQueueOutboundChannelAdapter, str2);
        RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint = new RedisQueueMessageDrivenEndpoint(str2, this.connectionFactory);
        redisQueueMessageDrivenEndpoint.setSerializer((RedisSerializer) null);
        doRegisterConsumer(str, messageChannel2, redisQueueMessageDrivenEndpoint);
    }

    @Override // org.springframework.xd.dirt.integration.bus.MessageBus
    public void bindReplier(String str, MessageChannel messageChannel, MessageChannel messageChannel2, Properties properties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("binding replier: " + str);
        }
        RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint = new RedisQueueMessageDrivenEndpoint("queue." + str + ".requests", this.connectionFactory);
        redisQueueMessageDrivenEndpoint.setBeanFactory(getBeanFactory());
        redisQueueMessageDrivenEndpoint.setSerializer((RedisSerializer) null);
        doRegisterConsumer(str, messageChannel, redisQueueMessageDrivenEndpoint);
        RedisQueueOutboundChannelAdapter redisQueueOutboundChannelAdapter = new RedisQueueOutboundChannelAdapter(parser.parseExpression("headers['replyTo']"), this.connectionFactory);
        redisQueueOutboundChannelAdapter.setBeanFactory(getBeanFactory());
        redisQueueOutboundChannelAdapter.setIntegrationEvaluationContext(this.evaluationContext);
        redisQueueOutboundChannelAdapter.afterPropertiesSet();
        doRegisterProducer(str, messageChannel2, redisQueueOutboundChannelAdapter);
    }

    public void destroy() {
        stopBindings();
    }
}
