Class ReactiveRedisStreamMessageProducer
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.redis.inbound.ReactiveRedisStreamMessageProducer
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanFactoryAware,org.springframework.beans.factory.BeanNameAware,org.springframework.beans.factory.DisposableBean,org.springframework.beans.factory.InitializingBean,org.springframework.beans.factory.SmartInitializingSingleton,org.springframework.context.ApplicationContextAware,org.springframework.context.Lifecycle,org.springframework.context.Phased,org.springframework.context.SmartLifecycle,org.springframework.integration.context.ExpressionCapable,org.springframework.integration.core.MessageProducer,org.springframework.integration.IntegrationPattern,org.springframework.integration.support.context.NamedComponent,org.springframework.integration.support.management.ManageableLifecycle,org.springframework.integration.support.management.ManageableSmartLifecycle,org.springframework.integration.support.management.TrackableComponent
public class ReactiveRedisStreamMessageProducer
extends org.springframework.integration.endpoint.MessageProducerSupport
A
MessageProducerSupport for reading messages from a Redis Stream and publishing them into the provided
output channel.
By default this adapter reads message as a standalone client XREAD (Redis command) but can be switched to a
Consumer Group feature XREADGROUP by setting consumerName field.
By default the Consumer Group name is the id of this bean IntegrationObjectSupport.getBeanName().- Since:
- 5.4
-
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLock -
Constructor Summary
Constructors Constructor Description ReactiveRedisStreamMessageProducer(org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactiveConnectionFactory, java.lang.String streamKey) -
Method Summary
Modifier and Type Method Description protected voiddoStart()java.lang.StringgetComponentType()protected voidonInit()voidsetAutoAck(boolean autoAck)Set whether or not acknowledge message read in the Consumer Group.voidsetBatchSize(int recordsPerPoll)Configure a batch size for the COUNT option during reading.voidsetConsumerGroup(java.lang.String consumerGroup)Set the name of the Consumer Group.voidsetConsumerName(java.lang.String consumerName)Set the name of the consumer.voidsetCreateConsumerGroup(boolean createConsumerGroup)Create the Consumer Group if and only if it does not exist.voidsetExtractPayload(boolean extractPayload)Configure this channel adapter to extract or not value from theRecord.voidsetObjectMapper(org.springframework.data.redis.hash.HashMapper<?,?,?> hashMapper)Configure a hash mapper.voidsetOnErrorResume(java.util.function.Function<? super java.lang.Throwable,? extends org.reactivestreams.Publisher<java.lang.Void>> resumeFunction)Configure a resume Function to resume the main sequence when polling the stream fails.voidsetPollTimeout(java.time.Duration pollTimeout)Configure a poll timeout for the BLOCK option during reading.voidsetReadOffset(org.springframework.data.redis.connection.stream.ReadOffset readOffset)Define the offset from which we want to read message.voidsetSerializer(org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair<?> pair)Configure a key, hash key and hash value serializer.voidsetStreamReceiverOptions(org.springframework.data.redis.stream.StreamReceiver.StreamReceiverOptions<java.lang.String,?> streamReceiverOptions)SetReactiveStreamOperationsused to customize theStreamReceiver.voidsetTargetType(java.lang.Class<?> targetType)Configure a hash target type.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, doStop, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
-
Constructor Details
-
ReactiveRedisStreamMessageProducer
public ReactiveRedisStreamMessageProducer(org.springframework.data.redis.connection.ReactiveRedisConnectionFactory reactiveConnectionFactory, java.lang.String streamKey)
-
-
Method Details
-
setReadOffset
public void setReadOffset(org.springframework.data.redis.connection.stream.ReadOffset readOffset)Define the offset from which we want to read message. By default theReadOffset.latest()is used.ReadOffset.latest()is equal to '$', which is the Id used withXREADto get new data added to the stream. Note that when switching to the Consumer Group feature, we set it toReadOffset.lastConsumed()if it is still equal toReadOffset.latest().- Parameters:
readOffset- the desired offset
-
setExtractPayload
public void setExtractPayload(boolean extractPayload)Configure this channel adapter to extract or not value from theRecord.- Parameters:
extractPayload- default true
-
setAutoAck
public void setAutoAck(boolean autoAck)Set whether or not acknowledge message read in the Consumer Group.trueby default.- Parameters:
autoAck- the acknowledge option.
-
setConsumerGroup
public void setConsumerGroup(@Nullable java.lang.String consumerGroup)Set the name of the Consumer Group. It is possible to create that Consumer Group if desired, see:createConsumerGroup. If not set, the defined bean nameIntegrationObjectSupport.getBeanName()is used.- Parameters:
consumerGroup- the Consumer Group on which this adapter should register to listen messages.
-
setConsumerName
public void setConsumerName(@Nullable java.lang.String consumerName)Set the name of the consumer. When a consumer name is provided, this adapter is switched to the Consumer Group feature. Note that this value should be unique in the group.- Parameters:
consumerName- the consumer name in the Consumer Group
-
setCreateConsumerGroup
public void setCreateConsumerGroup(boolean createConsumerGroup)Create the Consumer Group if and only if it does not exist. During the creation we also create the stream, seeMKSTREAM.- Parameters:
createConsumerGroup- specify if we should create the Consumer Group,falseby default
-
setStreamReceiverOptions
public void setStreamReceiverOptions(@Nullable org.springframework.data.redis.stream.StreamReceiver.StreamReceiverOptions<java.lang.String,?> streamReceiverOptions)SetReactiveStreamOperationsused to customize theStreamReceiver. It provides a way to set the polling timeout and the serialization context. By default the polling timeout is set to infinite andStringRedisSerializeris used. Mutually exclusive with 'pollTimeout', 'batchSize', 'onErrorResume', 'serializer', 'targetType', 'objectMapper'.- Parameters:
streamReceiverOptions- the desired receiver options
-
setPollTimeout
public void setPollTimeout(java.time.Duration pollTimeout)Configure a poll timeout for the BLOCK option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
pollTimeout- the timeout for polling.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.pollTimeout(Duration)
-
setBatchSize
public void setBatchSize(int recordsPerPoll)Configure a batch size for the COUNT option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
recordsPerPoll- must be greater zero.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.batchSize(int)
-
setOnErrorResume
public void setOnErrorResume(java.util.function.Function<? super java.lang.Throwable,? extends org.reactivestreams.Publisher<java.lang.Void>> resumeFunction)Configure a resume Function to resume the main sequence when polling the stream fails. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions). By default this function extract the failedRecordand sends anErrorMessageto the providedMessageProducerSupport.setErrorChannel(MessageChannel). The failed message for this record may have aIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader when manual acknowledgment is configured for this message producer.- Parameters:
resumeFunction- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.onErrorResume(Function)
-
setSerializer
public void setSerializer(org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair<?> pair)Configure a key, hash key and hash value serializer. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
pair- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.serializer(RedisSerializationContext)
-
setTargetType
public void setTargetType(java.lang.Class<?> targetType)Configure a hash target type. Changes the emitted Record type to ObjectRecord. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
targetType- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.targetType(Class)
-
setObjectMapper
public void setObjectMapper(org.springframework.data.redis.hash.HashMapper<?,?,?> hashMapper)Configure a hash mapper. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions).- Parameters:
hashMapper- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.objectMapper(HashMapper)
-
getComponentType
public java.lang.String getComponentType()- Specified by:
getComponentTypein interfaceorg.springframework.integration.support.context.NamedComponent- Overrides:
getComponentTypein classorg.springframework.integration.context.IntegrationObjectSupport
-
onInit
protected void onInit()- Overrides:
onInitin classorg.springframework.integration.endpoint.MessageProducerSupport
-
doStart
protected void doStart()- Overrides:
doStartin classorg.springframework.integration.endpoint.MessageProducerSupport
-