package org.springframework.cloud.stream.binder.redis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:lib/spring-cloud-stream-binder-redis-1.0.0.RC2.jar:org/springframework/cloud/stream/binder/redis/RedisMessageChannelBinder.class */
public class RedisMessageChannelBinder extends AbstractBinder<MessageChannel, ConsumerProperties, ProducerProperties> {
    private static final String ERROR_HEADER = "errorKey";
    static final String CONSUMER_GROUPS_KEY_PREFIX = "groups.";
    private static final SpelExpressionParser parser = new SpelExpressionParser();
    private final String[] headersToMap;
    private final RedisOperations<String, String> redisOperations;
    private final RedisConnectionFactory connectionFactory;
    private final RedisQueueOutboundChannelAdapter errorAdapter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-cloud-stream-binder-redis-1.0.0.RC2.jar:org/springframework/cloud/stream/binder/redis/RedisMessageChannelBinder$CompositeRedisQueueMessageDrivenEndpoint.class */
    public class CompositeRedisQueueMessageDrivenEndpoint extends MessageProducerSupport {
        private final List<RedisQueueMessageDrivenEndpoint> consumers = new ArrayList();

        public CompositeRedisQueueMessageDrivenEndpoint(String str, int i) {
            for (int i2 = 0; i2 < i; i2++) {
                RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint = new RedisQueueMessageDrivenEndpoint(str, RedisMessageChannelBinder.this.connectionFactory);
                redisQueueMessageDrivenEndpoint.setBeanFactory(RedisMessageChannelBinder.this.getBeanFactory());
                redisQueueMessageDrivenEndpoint.setSerializer(null);
                redisQueueMessageDrivenEndpoint.setBeanName("inbound." + str + "." + i2);
                this.consumers.add(redisQueueMessageDrivenEndpoint);
            }
            setBeanFactory(RedisMessageChannelBinder.this.getBeanFactory());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.context.IntegrationObjectSupport
        public void onInit() {
            Iterator<RedisQueueMessageDrivenEndpoint> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().afterPropertiesSet();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
        public void doStart() {
            Iterator<RedisQueueMessageDrivenEndpoint> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().start();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
        public void doStop() {
            Iterator<RedisQueueMessageDrivenEndpoint> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
        }

        @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.core.MessageProducer
        public void setOutputChannel(MessageChannel messageChannel) {
            Iterator<RedisQueueMessageDrivenEndpoint> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().setOutputChannel(messageChannel);
            }
        }

        @Override // org.springframework.integration.endpoint.MessageProducerSupport
        public void setErrorChannel(MessageChannel messageChannel) {
            Iterator<RedisQueueMessageDrivenEndpoint> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().setErrorChannel(messageChannel);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-cloud-stream-binder-redis-1.0.0.RC2.jar:org/springframework/cloud/stream/binder/redis/RedisMessageChannelBinder$ReceivingHandler.class */
    public class ReceivingHandler extends AbstractReplyProducingMessageHandler {
        private final ConsumerProperties consumerProperties;

        public ReceivingHandler(ConsumerProperties consumerProperties) {
            this.consumerProperties = consumerProperties;
            setBeanFactory(RedisMessageChannelBinder.this.getBeanFactory());
        }

        @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
        protected Object handleRequestMessage(Message<?> message) {
            return HeaderMode.embeddedHeaders.equals(this.consumerProperties.getHeaderMode()) ? RedisMessageChannelBinder.this.extractMessageValues(message).toMessage(getMessageBuilderFactory()) : message;
        }

        @Override // org.springframework.integration.handler.AbstractMessageProducingHandler
        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-cloud-stream-binder-redis-1.0.0.RC2.jar:org/springframework/cloud/stream/binder/redis/RedisMessageChannelBinder$SendingHandler.class */
    public class SendingHandler extends AbstractMessageHandler {
        private final String bindingName;
        private final ProducerProperties producerProperties;
        private final Map<String, RedisQueueOutboundChannelAdapter> adapters;
        private final PartitionHandler partitionHandler;

        private SendingHandler(String str, ProducerProperties producerProperties) {
            this.adapters = new HashMap();
            this.bindingName = str;
            this.producerProperties = producerProperties;
            ConfigurableListableBeanFactory beanFactory = RedisMessageChannelBinder.this.getBeanFactory();
            setBeanFactory(beanFactory);
            this.partitionHandler = new PartitionHandler(beanFactory, RedisMessageChannelBinder.this.evaluationContext, RedisMessageChannelBinder.this.partitionSelector, producerProperties);
            refreshChannelAdapters();
        }

        @Override // org.springframework.integration.handler.AbstractMessageHandler
        protected void handleMessageInternal(Message<?> message) throws Exception {
            MessageValues serializePayloadIfNecessary = RedisMessageChannelBinder.this.serializePayloadIfNecessary(message);
            if (this.producerProperties.isPartitioned()) {
                serializePayloadIfNecessary.put("partition", (Object) Integer.valueOf(this.partitionHandler.determinePartition(message)));
            }
            byte[] bArr = null;
            if (HeaderMode.embeddedHeaders.equals(this.producerProperties.getHeaderMode())) {
                bArr = RedisMessageChannelBinder.this.embeddedHeadersMessageConverter.embedHeaders(serializePayloadIfNecessary, RedisMessageChannelBinder.this.headersToMap);
            } else {
                Object obj = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
                if (obj != null && !obj.equals("application/octet-stream")) {
                    this.logger.error("Raw mode supports only application/octet-stream content type" + message.getPayload().getClass());
                }
                if (message.getPayload() instanceof byte[]) {
                    bArr = (byte[]) message.getPayload();
                } else {
                    this.logger.error("Raw mode supports only byte[] payloads but value sent was of type " + message.getPayload().getClass());
                }
            }
            if (bArr != null) {
                refreshChannelAdapters();
                Iterator<RedisQueueOutboundChannelAdapter> it = this.adapters.values().iterator();
                while (it.hasNext()) {
                    it.next().handleMessage(MessageBuilder.withPayload(bArr).copyHeaders(serializePayloadIfNecessary).build());
                }
            }
        }

        private void refreshChannelAdapters() {
            for (String str : RedisMessageChannelBinder.this.redisOperations.boundZSetOps(RedisMessageChannelBinder.CONSUMER_GROUPS_KEY_PREFIX + this.bindingName).rangeByScore(1.0d, Double.MAX_VALUE)) {
                if (!this.adapters.containsKey(str)) {
                    this.adapters.put(str, RedisMessageChannelBinder.this.createProducerEndpoint(String.format("%s.%s", this.bindingName, str), this.producerProperties));
                }
            }
        }
    }

    public RedisMessageChannelBinder(RedisConnectionFactory redisConnectionFactory) {
        this(redisConnectionFactory, new String[0]);
    }

    public RedisMessageChannelBinder(RedisConnectionFactory redisConnectionFactory, String... strArr) {
        Assert.notNull(redisConnectionFactory, "connectionFactory must not be null");
        this.connectionFactory = redisConnectionFactory;
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
        stringRedisTemplate.afterPropertiesSet();
        this.redisOperations = stringRedisTemplate;
        if (strArr == null || strArr.length <= 0) {
            this.headersToMap = BinderHeaders.STANDARD_HEADERS;
        } else {
            String[] strArr2 = (String[]) Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + strArr.length);
            System.arraycopy(strArr, 0, strArr2, BinderHeaders.STANDARD_HEADERS.length, strArr.length);
            this.headersToMap = strArr2;
        }
        this.errorAdapter = new RedisQueueOutboundChannelAdapter(parser.parseExpression("headers['errorKey']"), redisConnectionFactory);
    }

    @Override // org.springframework.cloud.stream.binder.AbstractBinder
    public void onInit() {
        this.errorAdapter.setIntegrationEvaluationContext(this.evaluationContext);
        this.errorAdapter.setBeanFactory(getBeanFactory());
        this.errorAdapter.afterPropertiesSet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.cloud.stream.binder.AbstractBinder
    public Binding<MessageChannel> doBindConsumer(String str, String str2, MessageChannel messageChannel, ConsumerProperties consumerProperties) {
        if (!StringUtils.hasText(str2)) {
            str2 = "anonymous." + UUID.randomUUID().toString();
        }
        String groupedName = groupedName(str, str2);
        if (consumerProperties.isPartitioned()) {
            groupedName = groupedName + RuleBasedTransactionAttribute.PREFIX_ROLLBACK_RULE + consumerProperties.getInstanceIndex();
        }
        return doRegisterConsumer(str, str2, groupedName, messageChannel, createInboundAdapter(consumerProperties, groupedName), consumerProperties);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private MessageProducerSupport createInboundAdapter(ConsumerProperties consumerProperties, String str) {
        CompositeRedisQueueMessageDrivenEndpoint compositeRedisQueueMessageDrivenEndpoint;
        int concurrency = consumerProperties.getConcurrency();
        int i = concurrency > 0 ? concurrency : 1;
        if (i == 1) {
            RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint = new RedisQueueMessageDrivenEndpoint(str, this.connectionFactory);
            redisQueueMessageDrivenEndpoint.setBeanFactory(getBeanFactory());
            redisQueueMessageDrivenEndpoint.setSerializer(null);
            compositeRedisQueueMessageDrivenEndpoint = redisQueueMessageDrivenEndpoint;
        } else {
            compositeRedisQueueMessageDrivenEndpoint = new CompositeRedisQueueMessageDrivenEndpoint(str, i);
        }
        return compositeRedisQueueMessageDrivenEndpoint;
    }

    private Binding<MessageChannel> doRegisterConsumer(String str, String str2, String str3, MessageChannel messageChannel, MessageProducerSupport messageProducerSupport, ConsumerProperties consumerProperties) {
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanFactory(getBeanFactory());
        directChannel.setBeanName(str3 + ".bridge");
        messageProducerSupport.setOutputChannel(addRetryIfNeeded(str3, directChannel, consumerProperties));
        messageProducerSupport.setBeanName("inbound." + str3);
        messageProducerSupport.afterPropertiesSet();
        DefaultBinding<MessageChannel> defaultBinding = new DefaultBinding<MessageChannel>(str, str2, messageChannel, messageProducerSupport) { // from class: org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder.1
            @Override // org.springframework.cloud.stream.binder.DefaultBinding
            protected void afterUnbind() {
                RedisMessageChannelBinder.this.redisOperations.boundZSetOps(RedisMessageChannelBinder.CONSUMER_GROUPS_KEY_PREFIX + getName()).incrementScore(getGroup(), -1.0d);
            }
        };
        ReceivingHandler receivingHandler = new ReceivingHandler(consumerProperties);
        receivingHandler.setOutputChannel(messageChannel);
        receivingHandler.setBeanName(str3 + ".bridge.handler");
        receivingHandler.afterPropertiesSet();
        directChannel.subscribe(receivingHandler);
        this.redisOperations.boundZSetOps(CONSUMER_GROUPS_KEY_PREFIX + str).incrementScore(str2, 1.0d);
        messageProducerSupport.start();
        return defaultBinding;
    }

    private MessageChannel addRetryIfNeeded(final String str, final DirectChannel directChannel, ConsumerProperties consumerProperties) {
        final RetryTemplate buildRetryTemplateIfRetryEnabled = buildRetryTemplateIfRetryEnabled(consumerProperties);
        if (buildRetryTemplateIfRetryEnabled == null) {
            return directChannel;
        }
        DirectChannel directChannel2 = new DirectChannel() { // from class: org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder.2
            @Override // org.springframework.integration.channel.AbstractSubscribableChannel, org.springframework.integration.channel.AbstractMessageChannel
            protected boolean doSend(final Message<?> message, final long j) {
                try {
                    return ((Boolean) buildRetryTemplateIfRetryEnabled.execute(new RetryCallback<Boolean, Exception>() { // from class: org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder.2.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // org.springframework.retry.RetryCallback
                        public Boolean doWithRetry(RetryContext retryContext) throws Exception {
                            return Boolean.valueOf(directChannel.send(message, j));
                        }
                    }, new RecoveryCallback<Boolean>() { // from class: org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder.2.2
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // org.springframework.retry.RecoveryCallback
                        public Boolean recover(RetryContext retryContext) throws Exception {
                            AnonymousClass2.this.logger.error("Failed to deliver message; retries exhausted; message sent to queue 'ERRORS:" + str + "' ", retryContext.getLastThrowable());
                            RedisMessageChannelBinder.this.errorAdapter.handleMessage(getMessageBuilderFactory().fromMessage(message).setHeader(RedisMessageChannelBinder.ERROR_HEADER, "ERRORS:" + str).build());
                            return true;
                        }
                    })).booleanValue();
                } catch (Exception e) {
                    this.logger.error("Failed to deliver message", e);
                    return false;
                }
            }
        };
        directChannel2.setBeanName(str + ".bridge");
        return directChannel2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.cloud.stream.binder.AbstractBinder
    public Binding<MessageChannel> doBindProducer(String str, MessageChannel messageChannel, ProducerProperties producerProperties) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        return doRegisterProducer(str, messageChannel, producerProperties);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RedisQueueOutboundChannelAdapter createProducerEndpoint(String str, ProducerProperties producerProperties) {
        RedisQueueOutboundChannelAdapter redisQueueOutboundChannelAdapter = !producerProperties.isPartitioned() ? new RedisQueueOutboundChannelAdapter(str, this.connectionFactory) : new RedisQueueOutboundChannelAdapter(parser.parseExpression(buildPartitionRoutingExpression(str)), this.connectionFactory);
        redisQueueOutboundChannelAdapter.setIntegrationEvaluationContext(this.evaluationContext);
        redisQueueOutboundChannelAdapter.setBeanFactory(getBeanFactory());
        redisQueueOutboundChannelAdapter.afterPropertiesSet();
        return redisQueueOutboundChannelAdapter;
    }

    private Binding<MessageChannel> doRegisterProducer(String str, MessageChannel messageChannel, ProducerProperties producerProperties) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new SendingHandler(str, producerProperties));
        eventDrivenConsumer.setBeanFactory(getBeanFactory());
        eventDrivenConsumer.setBeanName("outbound." + str);
        eventDrivenConsumer.afterPropertiesSet();
        DefaultBinding defaultBinding = new DefaultBinding(str, null, messageChannel, eventDrivenConsumer);
        String[] requiredGroups = producerProperties.getRequiredGroups();
        if (!ObjectUtils.isEmpty((Object[]) requiredGroups)) {
            for (String str2 : requiredGroups) {
                this.redisOperations.boundZSetOps(CONSUMER_GROUPS_KEY_PREFIX + str).incrementScore(str2, 1.0d);
            }
        }
        eventDrivenConsumer.start();
        return defaultBinding;
    }
}
