package org.springframework.integration.redis.inbound;

import java.time.Duration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.stream.StreamReceiver;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.redis.support.RedisHeaders;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducer.class */
public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport {
    private final ReactiveRedisConnectionFactory reactiveConnectionFactory;
    private final String streamKey;
    private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
    private StreamReceiver<String, ?> streamReceiver;

    @Nullable
    private String consumerGroup;

    @Nullable
    private String consumerName;
    private boolean createConsumerGroup;
    private StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions = StreamReceiver.StreamReceiverOptions.builder().pollTimeout(Duration.ZERO).build();
    private ReadOffset readOffset = ReadOffset.latest();
    private boolean extractPayload = true;
    private boolean autoAck = true;

    public ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, String str) {
        Assert.notNull(reactiveRedisConnectionFactory, "'connectionFactory' must not be null");
        Assert.hasText(str, "'streamKey' must be set");
        this.reactiveConnectionFactory = reactiveRedisConnectionFactory;
        this.streamKey = str;
    }

    public void setReadOffset(ReadOffset readOffset) {
        this.readOffset = readOffset;
    }

    public void setExtractPayload(boolean z) {
        this.extractPayload = z;
    }

    public void setAutoAck(boolean z) {
        this.autoAck = z;
    }

    public void setConsumerGroup(@Nullable String str) {
        this.consumerGroup = str;
    }

    public void setConsumerName(@Nullable String str) {
        this.consumerName = str;
    }

    public void setCreateConsumerGroup(boolean z) {
        this.createConsumerGroup = z;
    }

    public void setStreamReceiverOptions(@Nullable StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) {
        this.streamReceiverOptions = streamReceiverOptions;
    }

    public String getComponentType() {
        return "redis:stream-inbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        this.streamReceiver = StreamReceiver.create(this.reactiveConnectionFactory, this.streamReceiverOptions);
        if (StringUtils.hasText(this.consumerName) && StringUtils.isEmpty(this.consumerGroup)) {
            this.consumerGroup = getBeanName();
        }
        this.reactiveStreamOperations = new ReactiveRedisTemplate(this.reactiveConnectionFactory, RedisSerializationContext.string()).opsForStream();
    }

    protected void doStart() {
        Flux thenMany;
        super.doStart();
        StreamOffset create = StreamOffset.create(this.streamKey, this.readOffset);
        if (StringUtils.isEmpty(this.consumerName)) {
            thenMany = this.streamReceiver.receive(create);
        } else {
            Mono empty = Mono.empty();
            if (this.createConsumerGroup) {
                empty = this.reactiveStreamOperations.createGroup(this.streamKey, this.consumerGroup).onErrorReturn(this.consumerGroup);
            }
            Consumer from = Consumer.from(this.consumerGroup, this.consumerName);
            if (create.getOffset().equals(ReadOffset.latest())) {
                create = StreamOffset.create(this.streamKey, ReadOffset.lastConsumed());
            }
            thenMany = empty.thenMany(this.autoAck ? this.streamReceiver.receiveAutoAck(from, create) : this.streamReceiver.receive(from, create));
        }
        subscribeToPublisher(thenMany.map(record -> {
            return buildMessageFromRecord(record, this.extractPayload);
        }).onErrorContinue((th, obj) -> {
            MessageConversionException messageConversionException = new MessageConversionException(buildMessageFromRecord((Record) obj, false), "Cannot deserialize Redis Stream Record", th);
            if (sendErrorMessageIfNecessary(null, messageConversionException)) {
                return;
            }
            this.logger.getLog().error(messageConversionException);
        }));
    }

    private Message<?> buildMessageFromRecord(Record<String, ?> record, boolean z) {
        AbstractIntegrationMessageBuilder header = getMessageBuilderFactory().withPayload(z ? record.getValue() : record).setHeader(RedisHeaders.STREAM_KEY, record.getStream()).setHeader(RedisHeaders.STREAM_MESSAGE_ID, record.getId()).setHeader(RedisHeaders.CONSUMER_GROUP, this.consumerGroup).setHeader(RedisHeaders.CONSUMER, this.consumerName);
        if (!this.autoAck && this.consumerGroup != null) {
            header.setHeader("acknowledgmentCallback", () -> {
                this.reactiveStreamOperations.acknowledge(this.consumerGroup, record).subscribe();
            });
        }
        return header.build();
    }
}
