package org.springframework.integration.redis.outbound;

import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.class */
public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHandler {
    private final Expression streamKeyExpression;
    private final ReactiveRedisConnectionFactory connectionFactory;
    private EvaluationContext evaluationContext;
    private boolean extractPayload;
    private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
    private RedisSerializationContext<String, ?> serializationContext;

    @Nullable
    private HashMapper<String, ?, ?> hashMapper;

    public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, String str) {
        this(reactiveRedisConnectionFactory, (Expression) new LiteralExpression(str));
    }

    public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, Expression expression) {
        this.extractPayload = true;
        this.serializationContext = RedisSerializationContext.string();
        Assert.notNull(expression, "'streamKeyExpression' must not be null");
        Assert.notNull(reactiveRedisConnectionFactory, "'connectionFactory' must not be null");
        this.streamKeyExpression = expression;
        this.connectionFactory = reactiveRedisConnectionFactory;
    }

    public void setSerializationContext(RedisSerializationContext<String, ?> redisSerializationContext) {
        Assert.notNull(redisSerializationContext, "'serializationContext' must not be null");
        this.serializationContext = redisSerializationContext;
    }

    public void setHashMapper(@Nullable HashMapper<String, ?, ?> hashMapper) {
        this.hashMapper = hashMapper;
    }

    public void setExtractPayload(boolean z) {
        this.extractPayload = z;
    }

    public String getComponentType() {
        return "redis:stream-outbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(this.connectionFactory, this.serializationContext);
        this.reactiveStreamOperations = this.hashMapper == null ? reactiveRedisTemplate.opsForStream() : reactiveRedisTemplate.opsForStream(this.hashMapper);
    }

    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> {
            String str = (String) this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
            Assert.notNull(str, "'streamKey' must not be null");
            return str;
        }).flatMap(str -> {
            Object obj = message;
            if (this.extractPayload) {
                obj = message.getPayload();
            }
            return this.reactiveStreamOperations.add(StreamRecords.objectBacked(obj).withStreamKey(str));
        }).then();
    }
}
